HBASE-21588 Procedure v2 wal splitting implementation

Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/f02ac310
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/f02ac310
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/f02ac310

Branch: refs/heads/branch-2
Commit: f02ac310d249fb972705cd6621dfe495512d2d98
Parents: 348c2df
Author: Jingyun Tian <[email protected]>
Authored: Tue Jan 8 09:49:13 2019 +0800
Committer: Jingyun Tian <[email protected]>
Committed: Tue Jan 8 17:26:58 2019 +0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hbase/HConstants.java     |   8 +
 .../src/main/protobuf/MasterProcedure.proto     |  25 ++
 .../SplitLogWorkerCoordination.java             |   3 -
 .../ZkSplitLogWorkerCoordination.java           |   6 +-
 .../org/apache/hadoop/hbase/master/HMaster.java |  18 +
 .../hadoop/hbase/master/MasterServices.java     |   7 +
 .../hadoop/hbase/master/MasterWalManager.java   |   6 +-
 .../hadoop/hbase/master/SplitWALManager.java    | 239 ++++++++++++
 .../master/procedure/ServerCrashProcedure.java  |  76 +++-
 .../procedure/ServerProcedureInterface.java     |  13 +-
 .../hbase/master/procedure/ServerQueue.java     |   2 +
 .../master/procedure/SplitWALProcedure.java     | 199 ++++++++++
 .../procedure/SplitWALRemoteProcedure.java      | 195 ++++++++++
 .../hbase/regionserver/HRegionServer.java       |  18 +-
 .../hbase/regionserver/SplitLogWorker.java      |  99 ++---
 .../hbase/regionserver/SplitWALCallable.java    | 109 ++++++
 .../hadoop/hbase/master/AbstractTestDLS.java    |   3 +-
 .../hadoop/hbase/master/TestRestartCluster.java |  21 +
 .../hadoop/hbase/master/TestRollingRestart.java |  18 +-
 .../hbase/master/TestSplitWALManager.java       | 383 +++++++++++++++++++
 .../procedure/TestServerCrashProcedure.java     |  21 +-
 .../master/procedure/TestSplitWALProcedure.java | 133 +++++++
 .../hbase/regionserver/TestSplitLogWorker.java  |   5 +-
 23 files changed, 1537 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/f02ac310/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index 7aa1494..bc184e5 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -1347,6 +1347,14 @@ public final class HConstants {
   public static final String HBASE_CLIENT_FAST_FAIL_INTERCEPTOR_IMPL =
       "hbase.client.fast.fail.interceptor.impl";
 
+  public static final String HBASE_SPLIT_WAL_COORDINATED_BY_ZK = 
"hbase.split.wal.zk.coordinated";
+
+  public static final boolean DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK = true;
+
+  public static final String HBASE_SPLIT_WAL_MAX_SPLITTER = 
"hbase.regionserver.wal.max.splitters";
+
+  public static final int DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER = 2;
+
   /** Config key for if the server should send backpressure and if the client 
should listen to
    * that backpressure from the server */
   public static final String ENABLE_CLIENT_BACKPRESSURE = 
"hbase.client.backpressure.enabled";

http://git-wip-us.apache.org/repos/asf/hbase/blob/f02ac310/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto 
b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
index f96859c..1901282 100644
--- a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
@@ -308,6 +308,8 @@ enum ServerCrashState {
   SERVER_CRASH_WAIT_ON_ASSIGN = 9;
   SERVER_CRASH_SPLIT_META_LOGS = 10;
   SERVER_CRASH_ASSIGN_META = 11;
+  SERVER_CRASH_DELETE_SPLIT_META_WALS_DIR=12;
+  SERVER_CRASH_DELETE_SPLIT_WALS_DIR=13;
   SERVER_CRASH_HANDLE_RIT2 = 20[deprecated=true];
   SERVER_CRASH_FINISH = 100;
 }
@@ -502,4 +504,27 @@ message SwitchRpcThrottleStateData {
 message SwitchRpcThrottleRemoteStateData {
   required ServerName target_server = 1;
   required bool rpc_throttle_enabled = 2;
+}
+
+message SplitWALParameter {
+  required string wal_path = 1;
+}
+
+
+message SplitWALData {
+  required string wal_path = 1;
+  required ServerName crashed_server = 2;
+  optional ServerName worker = 3;
+}
+
+message SplitWALRemoteData {
+  required string wal_path = 1;
+  required ServerName crashed_server = 2;
+  required ServerName worker = 3;
+}
+
+enum SplitWALState {
+  ACQUIRE_SPLIT_WAL_WORKER = 1;
+  DISPATCH_WAL_TO_WORKER = 2;
+  RELEASE_SPLIT_WORKER = 3;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/f02ac310/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogWorkerCoordination.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogWorkerCoordination.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogWorkerCoordination.java
index ab04f60..ad74015 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogWorkerCoordination.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogWorkerCoordination.java
@@ -48,9 +48,6 @@ import 
org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
 @InterfaceAudience.Private
 public interface SplitLogWorkerCoordination {
 
-/* SplitLogWorker part */
-  int DEFAULT_MAX_SPLITTERS = 2;
-
   /**
    * Initialize internal values. This method should be used when corresponding 
SplitLogWorker
    * instance is created

http://git-wip-us.apache.org/repos/asf/hbase/blob/f02ac310/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java
index ff555f2..7ceaaec 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java
@@ -19,6 +19,9 @@
 
 package org.apache.hadoop.hbase.coordination;
 
+import static 
org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER;
+import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER;
+
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -135,7 +138,8 @@ public class ZkSplitLogWorkerCoordination extends 
ZKListener implements
     this.server = server;
     this.worker = worker;
     this.splitTaskExecutor = splitExecutor;
-    maxConcurrentTasks = conf.getInt("hbase.regionserver.wal.max.splitters", 
DEFAULT_MAX_SPLITTERS);
+    maxConcurrentTasks =
+        conf.getInt(HBASE_SPLIT_WAL_MAX_SPLITTER, 
DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER);
     reportPeriod =
         conf.getInt("hbase.splitlog.report.period",
           conf.getInt(HConstants.HBASE_SPLITLOG_MANAGER_TIMEOUT,

http://git-wip-us.apache.org/repos/asf/hbase/blob/f02ac310/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index bd5e67a..adbae00 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -17,7 +17,9 @@
  */
 package org.apache.hadoop.hbase.master;
 
+import static 
org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK;
 import static 
org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS;
+import static 
org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK;
 
 import com.google.protobuf.Descriptors;
 import com.google.protobuf.Service;
@@ -335,6 +337,13 @@ public class HMaster extends HRegionServer implements 
MasterServices {
   private MasterFileSystem fileSystemManager;
   private MasterWalManager walManager;
 
+  // manager to manage procedure-based WAL splitting, can be null if current
+  // is zk-based WAL splitting. SplitWALManager will replace SplitLogManager
+  // and MasterWalManager, which means zk-based WAL splitting code will be
+  // useless after we switch to the procedure-based one. our eventual goal
+  // is to remove all the zk-based WAL splitting code.
+  private SplitWALManager splitWALManager;
+
   // server manager to deal with region server info
   private volatile ServerManager serverManager;
 
@@ -942,6 +951,10 @@ public class HMaster extends HRegionServer implements 
MasterServices {
 
     status.setStatus("Initialize ServerManager and schedule SCP for crash 
servers");
     this.serverManager = createServerManager(this);
+    if (!conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK,
+      DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)) {
+      this.splitWALManager = new SplitWALManager(this);
+    }
     createProcedureExecutor();
     @SuppressWarnings("rawtypes")
     Map<Class<? extends Procedure>, List<Procedure<MasterProcedureEnv>>> 
procsByType =
@@ -1379,6 +1392,11 @@ public class HMaster extends HRegionServer implements 
MasterServices {
   }
 
   @Override
+  public SplitWALManager getSplitWALManager() {
+    return splitWALManager;
+  }
+
+  @Override
   public TableStateManager getTableStateManager() {
     return tableStateManager;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/f02ac310/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
index 537e09f..d15197f 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
@@ -495,4 +495,11 @@ public interface MasterServices extends Server {
    * @return True if cluster is up; false if cluster is not up (we are 
shutting down).
    */
   boolean isClusterUp();
+
+  /**
+   * @return return null if current is zk-based WAL splitting
+   */
+  default SplitWALManager getSplitWALManager(){
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/f02ac310/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterWalManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterWalManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterWalManager.java
index 5ab1c28..fbf4594 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterWalManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterWalManager.java
@@ -60,7 +60,8 @@ public class MasterWalManager {
     }
   };
 
-  final static PathFilter NON_META_FILTER = new PathFilter() {
+  @VisibleForTesting
+  public final static PathFilter NON_META_FILTER = new PathFilter() {
     @Override
     public boolean accept(Path p) {
       return !AbstractFSWALProvider.isMetaFile(p);
@@ -167,7 +168,6 @@ public class MasterWalManager {
 
   /**
    * @return listing of ServerNames found by parsing WAL directory paths in FS.
-   *
    */
   public Set<ServerName> getServerNamesFromWALDirPath(final PathFilter filter) 
throws IOException {
     FileStatus[] walDirForServerNames = getWALDirPaths(filter);
@@ -290,7 +290,7 @@ public class MasterWalManager {
   
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="UL_UNRELEASED_LOCK", 
justification=
       "We only release this lock when we set it. Updates to code that uses it 
should verify use " +
       "of the guard boolean.")
-  private List<Path> getLogDirs(final Set<ServerName> serverNames) throws 
IOException {
+  List<Path> getLogDirs(final Set<ServerName> serverNames) throws IOException {
     List<Path> logDirs = new ArrayList<>();
     boolean needReleaseLock = false;
     if (!this.services.isInitialized()) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/f02ac310/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitWALManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitWALManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitWALManager.java
new file mode 100644
index 0000000..fc50840
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitWALManager.java
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import static 
org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER;
+import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER;
+import static org.apache.hadoop.hbase.master.MasterWalManager.META_FILTER;
+import static org.apache.hadoop.hbase.master.MasterWalManager.NON_META_FILTER;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler;
+import org.apache.hadoop.hbase.master.procedure.SplitWALProcedure;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
+import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import 
org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+
+/**
+ * Create {@link SplitWALProcedure} for each WAL which need to split. Manage 
the workers for each
+ * {@link SplitWALProcedure}.
+ * Total number of workers is (number of online servers) * 
(HBASE_SPLIT_WAL_MAX_SPLITTER).
+ * Helps assign and release workers for split tasks.
+ * Provide helper method to delete split WAL file and directory.
+ *
+ * The user can get the SplitWALProcedures via splitWALs(crashedServer, 
splitMeta)
+ * can get the files that need to split via getWALsToSplit(crashedServer, 
splitMeta)
+ * can delete the splitting WAL and directory via deleteSplitWAL(wal)
+ * and deleteSplitWAL(crashedServer)
+ * can check if splitting WALs of a crashed server is success via 
isSplitWALFinished(walPath)
+ * can acquire and release a worker for splitting WAL via 
acquireSplitWALWorker(procedure)
+ * and releaseSplitWALWorker(worker, scheduler)
+ *
+ * This class is to replace the zk-based WAL splitting related code, {@link 
MasterWalManager},
+ * {@link SplitLogManager}, {@link 
org.apache.hadoop.hbase.zookeeper.ZKSplitLog} and
+ * {@link org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination} 
can be removed
+ * after we switch to procedure-based WAL splitting.
+ */
[email protected]
+public class SplitWALManager {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SplitWALManager.class);
+
+  private final MasterServices master;
+  private final SplitWorkerAssigner splitWorkerAssigner;
+  private final Path rootDir;
+  private final FileSystem fs;
+  private final Configuration conf;
+
+  public SplitWALManager(MasterServices master) {
+    this.master = master;
+    this.conf = master.getConfiguration();
+    this.splitWorkerAssigner = new SplitWorkerAssigner(this.master,
+        conf.getInt(HBASE_SPLIT_WAL_MAX_SPLITTER, 
DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER));
+    this.rootDir = master.getMasterFileSystem().getWALRootDir();
+    this.fs = master.getMasterFileSystem().getFileSystem();
+
+  }
+
+  public List<Procedure> splitWALs(ServerName crashedServer, boolean splitMeta)
+      throws IOException {
+    try {
+      // 1. list all splitting files
+      List<FileStatus> splittingFiles = getWALsToSplit(crashedServer, 
splitMeta);
+      // 2. create corresponding procedures
+      return createSplitWALProcedures(splittingFiles, crashedServer);
+    } catch (IOException e) {
+      LOG.error("failed to create procedures for splitting logs of {}", 
crashedServer, e);
+      throw e;
+    }
+  }
+
+  public List<FileStatus> getWALsToSplit(ServerName serverName, boolean 
splitMeta)
+      throws IOException {
+    List<Path> logDirs = 
master.getMasterWalManager().getLogDirs(Collections.singleton(serverName));
+    FileStatus[] fileStatuses =
+        SplitLogManager.getFileList(this.conf, logDirs, splitMeta ? 
META_FILTER : NON_META_FILTER);
+    LOG.info("size of WALs of {} is {}, isMeta: {}", serverName, 
fileStatuses.length, splitMeta);
+    return Lists.newArrayList(fileStatuses);
+  }
+
+  private Path getWALSplitDir(ServerName serverName) {
+    Path logDir =
+        new Path(this.rootDir, 
AbstractFSWALProvider.getWALDirectoryName(serverName.toString()));
+    return logDir.suffix(AbstractFSWALProvider.SPLITTING_EXT);
+  }
+
+  public void deleteSplitWAL(String wal) throws IOException {
+    fs.delete(new Path(wal), false);
+  }
+
+  public void deleteWALDir(ServerName serverName) throws IOException {
+    Path splitDir = getWALSplitDir(serverName);
+    fs.delete(splitDir, false);
+  }
+
+  public boolean isSplitWALFinished(String walPath) throws IOException {
+    return !fs.exists(new Path(rootDir, walPath));
+  }
+
+  @VisibleForTesting
+  List<Procedure> createSplitWALProcedures(List<FileStatus> splittingWALs,
+      ServerName crashedServer) {
+    return splittingWALs.stream()
+        .map(wal -> new SplitWALProcedure(wal.getPath().toString(), 
crashedServer))
+        .collect(Collectors.toList());
+  }
+
+  /**
+   * try to acquire an worker from online servers which is executring
+   * @param procedure split WAL task
+   * @return an available region server which could execute this task
+   * @throws ProcedureSuspendedException if there is no available worker,
+   *         it will throw this exception to let the procedure wait
+   */
+  public ServerName acquireSplitWALWorker(Procedure<?> procedure)
+      throws ProcedureSuspendedException {
+    Optional<ServerName> worker = splitWorkerAssigner.acquire();
+    LOG.debug("acquired a worker {} to split a WAL", worker);
+    if (worker.isPresent()) {
+      return worker.get();
+    }
+    splitWorkerAssigner.suspend(procedure);
+    throw new ProcedureSuspendedException();
+  }
+
+  /**
+   * After the worker finished the split WAL task, it will release the worker, 
and wake up all the
+   * suspend procedures in the ProcedureEvent
+   * @param worker worker which is about to release
+   * @param scheduler scheduler which is to wake up the procedure event
+   */
+  public void releaseSplitWALWorker(ServerName worker, 
MasterProcedureScheduler scheduler) {
+    LOG.debug("release a worker {} to split a WAL", worker);
+    splitWorkerAssigner.release(worker);
+    splitWorkerAssigner.wake(scheduler);
+  }
+
+  /**
+   * When master restart, there will be a new splitWorkerAssigner. But if 
there are splitting WAL
+   * tasks running on the region server side, they will not be count by the 
new splitWorkerAssigner.
+   * Thus we should add the workers of running tasks to the assigner when we 
load the procedures
+   * from MasterProcWALs.
+   * @param worker region server which is executing a split WAL task
+   */
+  public void addUsedSplitWALWorker(ServerName worker){
+    splitWorkerAssigner.addUsedWorker(worker);
+  }
+
+  /**
+   * help assign and release a worker for each WAL splitting task
+   * For each worker, concurrent running splitting task should be no more than 
maxSplitTasks
+   * If a task failed to acquire a worker, it will suspend and wait for 
workers available
+   *
+   */
+  private static final class SplitWorkerAssigner implements ServerListener {
+    private int maxSplitTasks;
+    private final ProcedureEvent<?> event;
+    private Map<ServerName, Integer> currentWorkers = new HashMap<>();
+    private MasterServices master;
+
+    public SplitWorkerAssigner(MasterServices master, int maxSplitTasks) {
+      this.maxSplitTasks = maxSplitTasks;
+      this.master = master;
+      this.event = new ProcedureEvent<>("split-WAL-worker-assigning");
+      this.master.getServerManager().registerListener(this);
+    }
+
+    public synchronized Optional<ServerName> acquire() {
+      List<ServerName> serverList = 
master.getServerManager().getOnlineServersList();
+      Collections.shuffle(serverList);
+      Optional<ServerName> worker = serverList.stream().filter(
+        serverName -> !currentWorkers.containsKey(serverName) || 
currentWorkers.get(serverName) > 0)
+          .findAny();
+      if (worker.isPresent()) {
+        currentWorkers.compute(worker.get(), (serverName,
+            availableWorker) -> availableWorker == null ? maxSplitTasks - 1 : 
availableWorker - 1);
+      }
+      return worker;
+    }
+
+    public synchronized void release(ServerName serverName) {
+      currentWorkers.compute(serverName, (k, v) -> v == null ? null : v + 1);
+    }
+
+    public void suspend(Procedure<?> proc) {
+      event.suspend();
+      event.suspendIfNotReady(proc);
+    }
+
+    public void wake(MasterProcedureScheduler scheduler) {
+      if (!event.isReady()) {
+        event.wake(scheduler);
+      }
+    }
+
+    @Override
+    public void serverAdded(ServerName worker) {
+      
this.wake(master.getMasterProcedureExecutor().getEnvironment().getProcedureScheduler());
+    }
+
+    public synchronized void addUsedWorker(ServerName worker) {
+      // load used worker when master restart
+      currentWorkers.compute(worker, (serverName,
+          availableWorker) -> availableWorker == null ? maxSplitTasks - 1 : 
availableWorker - 1);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/f02ac310/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java
index 05bcd28..2072727 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java
@@ -17,6 +17,9 @@
  */
 package org.apache.hadoop.hbase.master.procedure;
 
+import static 
org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK;
+import static 
org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -28,9 +31,11 @@ import org.apache.hadoop.hbase.client.RegionReplicaUtil;
 import org.apache.hadoop.hbase.client.TableState;
 import org.apache.hadoop.hbase.master.MasterServices;
 import org.apache.hadoop.hbase.master.MasterWalManager;
+import org.apache.hadoop.hbase.master.SplitWALManager;
 import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
 import org.apache.hadoop.hbase.master.assignment.RegionStateNode;
 import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure;
+import org.apache.hadoop.hbase.procedure2.Procedure;
 import org.apache.hadoop.hbase.procedure2.ProcedureMetrics;
 import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
 import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
@@ -107,6 +112,7 @@ public class ServerCrashProcedure
   protected Flow executeFromState(MasterProcedureEnv env, ServerCrashState 
state)
       throws ProcedureSuspendedException, ProcedureYieldException {
     final MasterServices services = env.getMasterServices();
+    final AssignmentManager am = env.getAssignmentManager();
     // HBASE-14802
     // If we have not yet notified that we are processing a dead server, we 
should do now.
     if (!notifiedDeadServer) {
@@ -117,6 +123,7 @@ public class ServerCrashProcedure
     switch (state) {
       case SERVER_CRASH_START:
       case SERVER_CRASH_SPLIT_META_LOGS:
+      case SERVER_CRASH_DELETE_SPLIT_META_WALS_DIR:
       case SERVER_CRASH_ASSIGN_META:
         break;
       default:
@@ -137,8 +144,24 @@ public class ServerCrashProcedure
           }
           break;
         case SERVER_CRASH_SPLIT_META_LOGS:
-          splitMetaLogs(env);
-          setNextState(ServerCrashState.SERVER_CRASH_ASSIGN_META);
+          if 
(env.getMasterConfiguration().getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK,
+            DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)) {
+            splitMetaLogs(env);
+            setNextState(ServerCrashState.SERVER_CRASH_ASSIGN_META);
+          } else {
+            am.getRegionStates().metaLogSplitting(serverName);
+            addChildProcedure(createSplittingWalProcedures(env, true));
+            
setNextState(ServerCrashState.SERVER_CRASH_DELETE_SPLIT_META_WALS_DIR);
+          }
+          break;
+        case SERVER_CRASH_DELETE_SPLIT_META_WALS_DIR:
+          if(isSplittingDone(env, true)){
+            cleanupSplitDir(env);
+            setNextState(ServerCrashState.SERVER_CRASH_ASSIGN_META);
+            am.getRegionStates().metaLogSplit(serverName);
+          } else {
+            setNextState(ServerCrashState.SERVER_CRASH_SPLIT_META_LOGS);
+          }
           break;
         case SERVER_CRASH_ASSIGN_META:
           assignRegions(env, 
Arrays.asList(RegionInfoBuilder.FIRST_META_REGIONINFO));
@@ -156,8 +179,24 @@ public class ServerCrashProcedure
           }
           break;
         case SERVER_CRASH_SPLIT_LOGS:
-          splitLogs(env);
-          setNextState(ServerCrashState.SERVER_CRASH_ASSIGN);
+          if 
(env.getMasterConfiguration().getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK,
+            DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)) {
+            splitLogs(env);
+            setNextState(ServerCrashState.SERVER_CRASH_ASSIGN);
+          } else {
+            am.getRegionStates().logSplitting(this.serverName);
+            addChildProcedure(createSplittingWalProcedures(env, false));
+            setNextState(ServerCrashState.SERVER_CRASH_DELETE_SPLIT_WALS_DIR);
+          }
+          break;
+        case SERVER_CRASH_DELETE_SPLIT_WALS_DIR:
+          if (isSplittingDone(env, false)) {
+            cleanupSplitDir(env);
+            setNextState(ServerCrashState.SERVER_CRASH_ASSIGN);
+            am.getRegionStates().logSplit(this.serverName);
+          } else {
+            setNextState(ServerCrashState.SERVER_CRASH_SPLIT_LOGS);
+          }
           break;
         case SERVER_CRASH_ASSIGN:
           // If no regions to assign, skip assign and skip to the finish.
@@ -179,6 +218,7 @@ public class ServerCrashProcedure
           setNextState(ServerCrashState.SERVER_CRASH_FINISH);
           break;
         case SERVER_CRASH_FINISH:
+          LOG.info("removed crashed server {} after splitting done", 
serverName);
           
services.getAssignmentManager().getRegionStates().removeServer(serverName);
           services.getServerManager().getDeadServers().finish(serverName);
           return Flow.NO_MORE_STATE;
@@ -191,6 +231,34 @@ public class ServerCrashProcedure
     return Flow.HAS_MORE_STATE;
   }
 
+  private void cleanupSplitDir(MasterProcedureEnv env) {
+    SplitWALManager splitWALManager = 
env.getMasterServices().getSplitWALManager();
+    try {
+      splitWALManager.deleteWALDir(serverName);
+    } catch (IOException e) {
+      LOG.warn("remove WAL directory of server {} failed, ignore...", 
serverName, e);
+    }
+  }
+
+  private boolean isSplittingDone(MasterProcedureEnv env, boolean splitMeta) {
+    LOG.debug("check if splitting WALs of {} done? isMeta: {}", serverName, 
splitMeta);
+    SplitWALManager splitWALManager = 
env.getMasterServices().getSplitWALManager();
+    try {
+      return splitWALManager.getWALsToSplit(serverName, splitMeta).size() == 0;
+    } catch (IOException e) {
+      LOG.warn("get filelist of serverName {} failed, retry...", serverName, 
e);
+      return false;
+    }
+  }
+
+  private Procedure[] createSplittingWalProcedures(MasterProcedureEnv env, 
boolean splitMeta)
+      throws IOException {
+    LOG.info("Splitting WALs {}, isMeta: {}", this, splitMeta);
+    SplitWALManager splitWALManager = 
env.getMasterServices().getSplitWALManager();
+    List<Procedure> procedures = splitWALManager.splitWALs(serverName, 
splitMeta);
+    return procedures.toArray(new Procedure[procedures.size()]);
+  }
+
   private boolean filterDefaultMetaRegions() {
     if (regionsOnCrashedServer == null) {
       return false;

http://git-wip-us.apache.org/repos/asf/hbase/blob/f02ac310/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java
index 7549b13..8162269 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java
@@ -27,7 +27,18 @@ import org.apache.yetus.audience.InterfaceAudience;
 @InterfaceAudience.Private
 public interface ServerProcedureInterface {
   public enum ServerOperationType {
-    CRASH_HANDLER, SWITCH_RPC_THROTTLE
+    CRASH_HANDLER, SWITCH_RPC_THROTTLE,
+    /**
+     * help find a available region server as worker and release worker after 
task done invoke
+     * SPLIT_WAL_REMOTE operation to send real WAL splitting request to worker 
manage the split wal
+     * task flow, will retry if SPLIT_WAL_REMOTE failed
+     */
+    SPLIT_WAL,
+
+    /**
+     * send the split WAL request to region server and handle the response
+     */
+    SPLIT_WAL_REMOTE
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/f02ac310/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java
index 9e3b311..1659ab5 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java
@@ -36,6 +36,8 @@ class ServerQueue extends Queue<ServerName> {
       case CRASH_HANDLER:
         return true;
       case SWITCH_RPC_THROTTLE:
+      case SPLIT_WAL:
+      case SPLIT_WAL_REMOTE:
         return false;
       default:
         break;

http://git-wip-us.apache.org/repos/asf/hbase/blob/f02ac310/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALProcedure.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALProcedure.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALProcedure.java
new file mode 100644
index 0000000..3b2d0d5
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALProcedure.java
@@ -0,0 +1,199 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.procedure;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.master.SplitWALManager;
+import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
+import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
+import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
+import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
+import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import 
org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
+
+/**
+ * The procedure is to split a WAL. It will get an available region server and
+ * schedule a {@link SplitWALRemoteProcedure} to actually send the request to 
region
+ * server to split this WAL.
+ * It also check if the split wal task really succeed. If the WAL still 
exists, it will
+ * schedule another region server to split this WAL.
+ */
[email protected]
+public class SplitWALProcedure
+    extends StateMachineProcedure<MasterProcedureEnv, 
MasterProcedureProtos.SplitWALState>
+    implements ServerProcedureInterface {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SplitWALProcedure.class);
+  private String walPath;
+  private ServerName worker;
+  private ServerName crashedServer;
+  private int attempts = 0;
+
+  public SplitWALProcedure() {
+  }
+
+  public SplitWALProcedure(String walPath, ServerName crashedServer) {
+    this.walPath = walPath;
+    this.crashedServer = crashedServer;
+  }
+
+  @Override
+  protected Flow executeFromState(MasterProcedureEnv env, 
MasterProcedureProtos.SplitWALState state)
+      throws ProcedureSuspendedException, ProcedureYieldException, 
InterruptedException {
+    SplitWALManager splitWALManager = 
env.getMasterServices().getSplitWALManager();
+    switch (state) {
+      case ACQUIRE_SPLIT_WAL_WORKER:
+        worker = splitWALManager.acquireSplitWALWorker(this);
+        
setNextState(MasterProcedureProtos.SplitWALState.DISPATCH_WAL_TO_WORKER);
+        return Flow.HAS_MORE_STATE;
+      case DISPATCH_WAL_TO_WORKER:
+        assert worker != null;
+        addChildProcedure(new SplitWALRemoteProcedure(worker, crashedServer, 
walPath));
+        setNextState(MasterProcedureProtos.SplitWALState.RELEASE_SPLIT_WORKER);
+        return Flow.HAS_MORE_STATE;
+      case RELEASE_SPLIT_WORKER:
+        boolean finished;
+        try {
+          finished = splitWALManager.isSplitWALFinished(walPath);
+        } catch (IOException ioe) {
+          long backoff = ProcedureUtil.getBackoffTimeMs(attempts++);
+          LOG.warn(
+            "Failed to check whether splitting wal {} success, wait {} seconds 
to retry",
+            walPath, backoff / 1000, ioe);
+          throw suspend(backoff);
+        }
+        splitWALManager.releaseSplitWALWorker(worker, 
env.getProcedureScheduler());
+        if (!finished) {
+          LOG.warn("Failed to split wal {} by server {}, retry...", walPath, 
worker);
+          
setNextState(MasterProcedureProtos.SplitWALState.ACQUIRE_SPLIT_WAL_WORKER);
+          return Flow.HAS_MORE_STATE;
+        }
+        return Flow.NO_MORE_STATE;
+      default:
+        throw new UnsupportedOperationException("unhandled state=" + state);
+    }
+  }
+
+  @Override
+  protected void rollbackState(MasterProcedureEnv env,
+      MasterProcedureProtos.SplitWALState splitOneWalState)
+      throws IOException, InterruptedException {
+    if (splitOneWalState == getInitialState()) {
+      return;
+    }
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  protected MasterProcedureProtos.SplitWALState getState(int stateId) {
+    return MasterProcedureProtos.SplitWALState.forNumber(stateId);
+  }
+
+  @Override
+  protected int getStateId(MasterProcedureProtos.SplitWALState state) {
+    return state.getNumber();
+  }
+
+  @Override
+  protected MasterProcedureProtos.SplitWALState getInitialState() {
+    return MasterProcedureProtos.SplitWALState.ACQUIRE_SPLIT_WAL_WORKER;
+  }
+
+  @Override
+  protected void serializeStateData(ProcedureStateSerializer serializer) 
throws IOException {
+    super.serializeStateData(serializer);
+    MasterProcedureProtos.SplitWALData.Builder builder =
+        MasterProcedureProtos.SplitWALData.newBuilder();
+    
builder.setWalPath(walPath).setCrashedServer(ProtobufUtil.toServerName(crashedServer));
+    if (worker != null) {
+      builder.setWorker(ProtobufUtil.toServerName(worker));
+    }
+    serializer.serialize(builder.build());
+  }
+
+  @Override
+  protected void deserializeStateData(ProcedureStateSerializer serializer) 
throws IOException {
+    super.deserializeStateData(serializer);
+    MasterProcedureProtos.SplitWALData data =
+        serializer.deserialize(MasterProcedureProtos.SplitWALData.class);
+    walPath = data.getWalPath();
+    crashedServer = ProtobufUtil.toServerName(data.getCrashedServer());
+    if (data.hasWorker()) {
+      worker = ProtobufUtil.toServerName(data.getWorker());
+    }
+  }
+
+  @Override
+  protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) {
+    setState(ProcedureProtos.ProcedureState.RUNNABLE);
+    env.getProcedureScheduler().addFront(this);
+    return false;
+  }
+
+  protected final ProcedureSuspendedException suspend(long backoff)
+      throws ProcedureSuspendedException {
+    attempts++;
+    setTimeout(Math.toIntExact(backoff));
+    setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
+    skipPersistence();
+    throw new ProcedureSuspendedException();
+  }
+
+  public String getWAL() {
+    return walPath;
+  }
+
+  @VisibleForTesting
+  public ServerName getWorker(){
+    return worker;
+  }
+
+  @Override
+  public ServerName getServerName() {
+    return this.crashedServer;
+  }
+
+  @Override
+  public boolean hasMetaTableRegion() {
+    return AbstractFSWALProvider.isMetaFile(new Path(walPath));
+  }
+
+  @Override
+  public ServerOperationType getServerOperationType() {
+    return ServerOperationType.SPLIT_WAL;
+  }
+
+  @Override
+  protected void afterReplay(MasterProcedureEnv env){
+    if(worker != null){
+      
env.getMasterServices().getSplitWALManager().addUsedSplitWALWorker(worker);
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/f02ac310/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALRemoteProcedure.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALRemoteProcedure.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALRemoteProcedure.java
new file mode 100644
index 0000000..fb2dbd7
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALRemoteProcedure.java
@@ -0,0 +1,195 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.procedure;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.procedure2.NoNodeDispatchException;
+import org.apache.hadoop.hbase.procedure2.NoServerDispatchException;
+import org.apache.hadoop.hbase.procedure2.NullTargetServerDispatchException;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
+import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
+import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
+import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
+import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher;
+import org.apache.hadoop.hbase.procedure2.RemoteProcedureException;
+import org.apache.hadoop.hbase.regionserver.SplitWALCallable;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
+/**
+ * A remote procedure which is used to send split WAL request to region server.
+ * it will return null if the task is succeed or return a DoNotRetryIOException
+ * {@link SplitWALProcedure} will help handle the situation that encounter
+ * DoNotRetryIOException. Otherwise it will retry until succeed.
+ */
[email protected]
+public class SplitWALRemoteProcedure extends Procedure<MasterProcedureEnv>
+    implements RemoteProcedureDispatcher.RemoteProcedure<MasterProcedureEnv, 
ServerName>,
+    ServerProcedureInterface {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SplitWALRemoteProcedure.class);
+  private String walPath;
+  private ServerName worker;
+  private ServerName crashedServer;
+  private boolean dispatched;
+  private ProcedureEvent<?> event;
+  private boolean success = false;
+
+  public SplitWALRemoteProcedure() {
+  }
+
+  public SplitWALRemoteProcedure(ServerName worker, ServerName crashedServer, 
String wal) {
+    this.worker = worker;
+    this.crashedServer = crashedServer;
+    this.walPath = wal;
+  }
+
+  @Override
+  protected Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env)
+      throws ProcedureYieldException, ProcedureSuspendedException, 
InterruptedException {
+    if (dispatched) {
+      if (success) {
+        return null;
+      }
+      dispatched = false;
+    }
+    try {
+      env.getRemoteDispatcher().addOperationToNode(worker, this);
+    } catch (NoNodeDispatchException | NullTargetServerDispatchException
+        | NoServerDispatchException e) {
+      // When send to a wrong target server, it need construct a new 
SplitWALRemoteProcedure.
+      // Thus return null for this procedure and let SplitWALProcedure to 
handle this.
+      LOG.warn("dispatch WAL {} to {} failed, will retry on another server", 
walPath, worker, e);
+      return null;
+    }
+    dispatched = true;
+    event = new ProcedureEvent<>(this);
+    event.suspendIfNotReady(this);
+    throw new ProcedureSuspendedException();
+  }
+
+  @Override
+  protected void rollback(MasterProcedureEnv env) throws IOException, 
InterruptedException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  protected boolean abort(MasterProcedureEnv env) {
+    return false;
+  }
+
+  @Override
+  protected void serializeStateData(ProcedureStateSerializer serializer) 
throws IOException {
+    MasterProcedureProtos.SplitWALRemoteData.Builder builder =
+        MasterProcedureProtos.SplitWALRemoteData.newBuilder();
+    builder.setWalPath(walPath).setWorker(ProtobufUtil.toServerName(worker))
+        .setCrashedServer(ProtobufUtil.toServerName(crashedServer));
+    serializer.serialize(builder.build());
+  }
+
+  @Override
+  protected void deserializeStateData(ProcedureStateSerializer serializer) 
throws IOException {
+    MasterProcedureProtos.SplitWALRemoteData data =
+        serializer.deserialize(MasterProcedureProtos.SplitWALRemoteData.class);
+    walPath = data.getWalPath();
+    worker = ProtobufUtil.toServerName(data.getWorker());
+    crashedServer = ProtobufUtil.toServerName(data.getCrashedServer());
+  }
+
+  @Override
+  public RemoteProcedureDispatcher.RemoteOperation 
remoteCallBuild(MasterProcedureEnv env,
+      ServerName serverName) {
+    return new RSProcedureDispatcher.ServerOperation(this, getProcId(), 
SplitWALCallable.class,
+        
MasterProcedureProtos.SplitWALParameter.newBuilder().setWalPath(walPath).build()
+            .toByteArray());
+  }
+
+  @Override
+  public void remoteCallFailed(MasterProcedureEnv env, ServerName serverName,
+      IOException exception) {
+    complete(env, exception);
+  }
+
+  @Override
+  public void remoteOperationCompleted(MasterProcedureEnv env) {
+    complete(env, null);
+  }
+
+  private void complete(MasterProcedureEnv env, Throwable error) {
+    if (event == null) {
+      LOG.warn("procedure event for {} is null, maybe the procedure is created 
when recovery",
+        getProcId());
+      return;
+    }
+    if (error == null) {
+      LOG.info("split WAL {} on {} succeeded", walPath, worker);
+      try {
+        env.getMasterServices().getSplitWALManager().deleteSplitWAL(walPath);
+      } catch (IOException e){
+        LOG.warn("remove WAL {} failed, ignore...", walPath, e);
+      }
+      success = true;
+    } else {
+      if (error instanceof DoNotRetryIOException) {
+        LOG.warn("WAL split task of {} send to a wrong server {}, will retry 
on another server",
+          walPath, worker, error);
+        success = true;
+      } else {
+        LOG.warn("split WAL {} failed, retry...", walPath, error);
+        success = false;
+      }
+
+    }
+    event.wake(env.getProcedureScheduler());
+    event = null;
+  }
+
+  @Override
+  public void remoteOperationFailed(MasterProcedureEnv env, 
RemoteProcedureException error) {
+    complete(env, error);
+  }
+
+  public String getWAL() {
+    return this.walPath;
+  }
+
+  @Override
+  public ServerName getServerName() {
+    // return the crashed server is to use the queue of root 
ServerCrashProcedure
+    return this.crashedServer;
+  }
+
+  @Override
+  public boolean hasMetaTableRegion() {
+    return AbstractFSWALProvider.isMetaFile(new Path(walPath));
+  }
+
+  @Override
+  public ServerOperationType getServerOperationType() {
+    return ServerOperationType.SPLIT_WAL_REMOTE;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/f02ac310/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 2582a78..bbae268 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -17,6 +17,11 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
+import static 
org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK;
+import static 
org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER;
+import static 
org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK;
+import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER;
+
 import java.io.IOException;
 import java.lang.Thread.UncaughtExceptionHandler;
 import java.lang.management.MemoryType;
@@ -88,7 +93,6 @@ import org.apache.hadoop.hbase.client.locking.EntityLock;
 import org.apache.hadoop.hbase.client.locking.LockServiceClient;
 import org.apache.hadoop.hbase.conf.ConfigurationManager;
 import org.apache.hadoop.hbase.conf.ConfigurationObserver;
-import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
 import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
 import org.apache.hadoop.hbase.exceptions.RegionMovedException;
@@ -616,7 +620,10 @@ public class HRegionServer extends HasThread implements
           rpcServices.isa.getPort(), this, canCreateBaseZNode());
         // If no master in cluster, skip trying to track one or look for a 
cluster status.
         if (!this.masterless) {
-          this.csm = new ZkCoordinatedStateManager(this);
+          if (conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK,
+            DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)) {
+            this.csm = new ZkCoordinatedStateManager(this);
+          }
 
           masterAddressTracker = new MasterAddressTracker(getZooKeeper(), 
this);
           masterAddressTracker.start();
@@ -1923,7 +1930,7 @@ public class HRegionServer extends HasThread implements
           conf.getInt("hbase.storescanner.parallel.seek.threads", 10));
     }
     this.executorService.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, 
conf.getInt(
-    "hbase.regionserver.wal.max.splitters", 
SplitLogWorkerCoordination.DEFAULT_MAX_SPLITTERS));
+        HBASE_SPLIT_WAL_MAX_SPLITTER, DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER));
     // Start the threads for compacted files discharger
     
this.executorService.startExecutorService(ExecutorType.RS_COMPACTED_FILES_DISCHARGER,
         
conf.getInt(CompactionConfiguration.HBASE_HFILE_COMPACTION_DISCHARGER_THREAD_COUNT,
 10));
@@ -1966,9 +1973,10 @@ public class HRegionServer extends HasThread implements
     sinkConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
         conf.getInt("hbase.log.replay.rpc.timeout", 30000)); // default 30 
seconds
     sinkConf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1);
-    if (this.csm != null) {
+    if (this.csm != null && conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK,
+      DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)) {
       // SplitLogWorker needs csm. If none, don't start this.
-      this.splitLogWorker = new SplitLogWorker(this, sinkConf, this,
+      this.splitLogWorker = new SplitLogWorker(sinkConf, this,
           this, walFactory);
       splitLogWorker.start();
     } else {

http://git-wip-us.apache.org/repos/asf/hbase/blob/f02ac310/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
index a1c2030..6ecf502 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.client.RetriesExhaustedException;
 import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
+import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor.Status;
 import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.hadoop.hbase.wal.WALSplitter;
 import org.apache.hadoop.hbase.util.CancelableProgressable;
@@ -78,58 +79,60 @@ public class SplitLogWorker implements Runnable {
     coordination.init(server, conf, splitTaskExecutor, this);
   }
 
-  public SplitLogWorker(final Server hserver, final Configuration conf,
-      final RegionServerServices server, final LastSequenceId 
sequenceIdChecker,
-      final WALFactory factory) {
-    this(hserver, conf, server, new TaskExecutor() {
-      @Override
-      public Status exec(String filename, CancelableProgressable p) {
-        Path walDir;
-        FileSystem fs;
-        try {
-          walDir = FSUtils.getWALRootDir(conf);
-          fs = walDir.getFileSystem(conf);
-        } catch (IOException e) {
-          LOG.warn("could not find root dir or fs", e);
-          return Status.RESIGNED;
-        }
-        // TODO have to correctly figure out when log splitting has been
-        // interrupted or has encountered a transient error and when it has
-        // encountered a bad non-retry-able persistent error.
-        try {
-          if (!WALSplitter.splitLogFile(walDir, fs.getFileStatus(new 
Path(walDir, filename)),
-            fs, conf, p, sequenceIdChecker,
-              
server.getCoordinatedStateManager().getSplitLogWorkerCoordination(), factory)) {
-            return Status.PREEMPTED;
-          }
-        } catch (InterruptedIOException iioe) {
-          LOG.warn("log splitting of " + filename + " interrupted, resigning", 
iioe);
-          return Status.RESIGNED;
-        } catch (IOException e) {
-          if (e instanceof FileNotFoundException) {
-            // A wal file may not exist anymore. Nothing can be recovered so 
move on
-            LOG.warn("WAL {} does not exist anymore", filename, e);
-            return Status.DONE;
-          }
-          Throwable cause = e.getCause();
-          if (e instanceof RetriesExhaustedException && (cause instanceof 
NotServingRegionException
-                  || cause instanceof ConnectException
-                  || cause instanceof SocketTimeoutException)) {
-            LOG.warn("log replaying of " + filename + " can't connect to the 
target regionserver, "
-                + "resigning", e);
-            return Status.RESIGNED;
-          } else if (cause instanceof InterruptedException) {
-            LOG.warn("log splitting of " + filename + " interrupted, 
resigning", e);
-            return Status.RESIGNED;
-          }
-          LOG.warn("log splitting of " + filename + " failed, returning 
error", e);
-          return Status.ERR;
-        }
+  public SplitLogWorker(Configuration conf, RegionServerServices server,
+      LastSequenceId sequenceIdChecker, WALFactory factory) {
+    this(server, conf, server, (f, p) -> splitLog(f, p, conf, server, 
sequenceIdChecker, factory));
+  }
+
+  static Status splitLog(String filename, CancelableProgressable p, 
Configuration conf,
+      RegionServerServices server, LastSequenceId sequenceIdChecker, 
WALFactory factory) {
+    Path walDir;
+    FileSystem fs;
+    try {
+      walDir = FSUtils.getWALRootDir(conf);
+      fs = walDir.getFileSystem(conf);
+    } catch (IOException e) {
+      LOG.warn("could not find root dir or fs", e);
+      return Status.RESIGNED;
+    }
+    // TODO have to correctly figure out when log splitting has been
+    // interrupted or has encountered a transient error and when it has
+    // encountered a bad non-retry-able persistent error.
+    try {
+      SplitLogWorkerCoordination splitLogWorkerCoordination =
+          server.getCoordinatedStateManager() == null ? null
+              : 
server.getCoordinatedStateManager().getSplitLogWorkerCoordination();
+      if (!WALSplitter.splitLogFile(walDir, fs.getFileStatus(new Path(walDir, 
filename)), fs, conf,
+        p, sequenceIdChecker, splitLogWorkerCoordination, factory)) {
+        return Status.PREEMPTED;
+      }
+    } catch (InterruptedIOException iioe) {
+      LOG.warn("log splitting of " + filename + " interrupted, resigning", 
iioe);
+      return Status.RESIGNED;
+    } catch (IOException e) {
+      if (e instanceof FileNotFoundException) {
+        // A wal file may not exist anymore. Nothing can be recovered so move 
on
+        LOG.warn("WAL {} does not exist anymore", filename, e);
         return Status.DONE;
       }
-    });
+      Throwable cause = e.getCause();
+      if (e instanceof RetriesExhaustedException && (cause instanceof 
NotServingRegionException
+          || cause instanceof ConnectException || cause instanceof 
SocketTimeoutException)) {
+        LOG.warn("log replaying of " + filename + " can't connect to the 
target regionserver, "
+            + "resigning",
+          e);
+        return Status.RESIGNED;
+      } else if (cause instanceof InterruptedException) {
+        LOG.warn("log splitting of " + filename + " interrupted, resigning", 
e);
+        return Status.RESIGNED;
+      }
+      LOG.warn("log splitting of " + filename + " failed, returning error", e);
+      return Status.ERR;
+    }
+    return Status.DONE;
   }
 
+
   @Override
   public void run() {
     try {

http://git-wip-us.apache.org/repos/asf/hbase/blob/f02ac310/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitWALCallable.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitWALCallable.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitWALCallable.java
new file mode 100644
index 0000000..b94df22
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitWALCallable.java
@@ -0,0 +1,109 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.concurrent.locks.Lock;
+
+import org.apache.hadoop.hbase.executor.EventType;
+import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
+import org.apache.hadoop.hbase.util.KeyLocker;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import 
org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
+
+/**
+ * This callable is used to do the real split WAL task. It is called by
+ * {@link org.apache.hadoop.hbase.master.procedure.SplitWALRemoteProcedure} 
from master and executed
+ * by executor service which is in charge of executing the events of 
EventType.RS_LOG_REPLAY
+ *
+ * When execute this callable, it will call SplitLogWorker.splitLog() to split 
the WAL.
+ * If the return value is SplitLogWorker.TaskExecutor.Status.DONE, it means 
the task is successful
+ * and it will return null to end the call. Otherwise it will throw an 
exception and let
+ * {@link org.apache.hadoop.hbase.master.procedure.SplitWALRemoteProcedure} to 
handle this problem.
+ *
+ * This class is to replace the zk-based WAL splitting related code, {@link 
SplitLogWorker},
+ * {@link org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination} and
+ * {@link org.apache.hadoop.hbase.coordination.ZkSplitLogWorkerCoordination} 
can be removed after
+ * we switch to procedure-based WAL splitting.
+ */
[email protected]
+public class SplitWALCallable implements RSProcedureCallable {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SplitWALCallable.class);
+
+  private String walPath;
+  private Exception initError;
+  private HRegionServer rs;
+  private final KeyLocker<String> splitWALLocks = new KeyLocker<>();
+  private volatile Lock splitWALLock = null;
+
+
+  @Override
+  public void init(byte[] parameter, HRegionServer rs) {
+    try {
+      this.rs = rs;
+      MasterProcedureProtos.SplitWALParameter param =
+          MasterProcedureProtos.SplitWALParameter.parseFrom(parameter);
+      this.walPath = param.getWalPath();
+    } catch (InvalidProtocolBufferException e) {
+      LOG.error("parse proto buffer of split WAL request failed ", e);
+      initError = e;
+    }
+  }
+
+  @Override
+  public EventType getEventType() {
+    return EventType.RS_LOG_REPLAY;
+  }
+
+  @Override
+  public Void call() throws Exception {
+    if (initError != null) {
+      throw initError;
+    }
+    //grab a lock
+    splitWALLock = splitWALLocks.acquireLock(walPath);
+    try{
+      splitWal();
+      LOG.info("split WAL {} succeed.", walPath);
+    } catch (IOException e){
+      LOG.warn("failed to split WAL {}.", walPath, e);
+      throw e;
+    }
+    finally {
+      splitWALLock.unlock();
+    }
+    return null;
+  }
+
+  public String getWalPath() {
+    return this.walPath;
+  }
+
+  private void splitWal() throws IOException {
+    SplitLogWorker.TaskExecutor.Status status =
+        SplitLogWorker.splitLog(walPath, null, rs.getConfiguration(), rs, rs, 
rs.walFactory);
+    if (status != SplitLogWorker.TaskExecutor.Status.DONE) {
+      throw new IOException("Split WAL " + walPath + " failed at server ");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/f02ac310/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java
index 94e9845..e722aad 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java
@@ -18,6 +18,7 @@
  */
 package org.apache.hadoop.hbase.master;
 
+import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER;
 import static 
org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_wait_for_zk_delete;
 import static 
org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_final_transition_failed;
 import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_preempt_task;
@@ -143,7 +144,7 @@ public abstract class AbstractTestDLS {
     conf.setInt("zookeeper.recovery.retry", 0);
     conf.setInt(HConstants.REGIONSERVER_INFO_PORT, -1);
     conf.setFloat(HConstants.LOAD_BALANCER_SLOP_KEY, (float) 100.0); // no 
load balancing
-    conf.setInt("hbase.regionserver.wal.max.splitters", 3);
+    conf.setInt(HBASE_SPLIT_WAL_MAX_SPLITTER, 3);
     conf.setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10);
     conf.set("hbase.wal.provider", getWalProvider());
     StartMiniClusterOption option = StartMiniClusterOption.builder()

http://git-wip-us.apache.org/repos/asf/hbase/blob/f02ac310/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRestartCluster.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRestartCluster.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRestartCluster.java
index e031d34..e1a72a6 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRestartCluster.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRestartCluster.java
@@ -22,6 +22,8 @@ import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -47,12 +49,16 @@ import org.apache.hadoop.hbase.util.JVMClusterUtil;
 import org.apache.hadoop.hbase.util.Threads;
 import org.junit.After;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+@RunWith(Parameterized.class)
 @Category({ MasterTests.class, LargeTests.class })
 public class TestRestartCluster {
 
@@ -63,6 +69,9 @@ public class TestRestartCluster {
   private static final Logger LOG = 
LoggerFactory.getLogger(TestRestartCluster.class);
   private HBaseTestingUtility UTIL = new HBaseTestingUtility();
 
+  @Parameterized.Parameter
+  public boolean splitWALCoordinatedByZK;
+
   private static final TableName[] TABLES = {
       TableName.valueOf("restartTableOne"),
       TableName.valueOf("restartTableTwo"),
@@ -70,6 +79,13 @@ public class TestRestartCluster {
   };
   private static final byte[] FAMILY = Bytes.toBytes("family");
 
+  @Before
+  public void setup() throws Exception {
+    LOG.info("WAL splitting coordinated by zk? {}", splitWALCoordinatedByZK);
+    
UTIL.getConfiguration().setBoolean(HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK,
+      splitWALCoordinatedByZK);
+  }
+
   @After public void tearDown() throws Exception {
     UTIL.shutdownMiniCluster();
   }
@@ -304,4 +320,9 @@ public class TestRestartCluster {
       Thread.sleep(100);
     }
   }
+
+  @Parameterized.Parameters
+  public static Collection coordinatedByZK() {
+    return Arrays.asList(false, true);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/f02ac310/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRollingRestart.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRollingRestart.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRollingRestart.java
index ff8ad0b..7530121 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRollingRestart.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRollingRestart.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.master;
 import static org.junit.Assert.assertEquals;
 
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
 import java.util.NavigableSet;
 import java.util.Set;
@@ -28,6 +30,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.MiniHBaseCluster;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.StartMiniClusterOption;
@@ -45,6 +48,8 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,6 +58,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 /**
  * Tests the restarting of everything as done during rolling restarts.
  */
+@RunWith(Parameterized.class)
 @Category({MasterTests.class, LargeTests.class})
 public class TestRollingRestart {
 
@@ -65,6 +71,9 @@ public class TestRollingRestart {
   @Rule
   public TestName name = new TestName();
 
+  @Parameterized.Parameter
+  public boolean splitWALCoordinatedByZK;
+
   @Test
   public void testBasicRollingRestart() throws Exception {
 
@@ -78,6 +87,8 @@ public class TestRollingRestart {
     // Start the cluster
     log("Starting cluster");
     Configuration conf = HBaseConfiguration.create();
+    conf.setBoolean(HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK,
+        splitWALCoordinatedByZK);
     HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(conf);
     StartMiniClusterOption option = StartMiniClusterOption.builder()
         
.numMasters(NUM_MASTERS).numRegionServers(NUM_RS).numDataNodes(NUM_RS).build();
@@ -87,7 +98,8 @@ public class TestRollingRestart {
     cluster.waitForActiveAndReadyMaster();
 
     // Create a table with regions
-    final TableName tableName = TableName.valueOf(name.getMethodName());
+    final TableName tableName =
+        TableName.valueOf(name.getMethodName().replaceAll("[\\[|\\]]", "-"));
     byte [] family = Bytes.toBytes("family");
     log("Creating table with " + NUM_REGIONS_TO_CREATE + " regions");
     Table ht = TEST_UTIL.createMultiRegionTable(tableName, family, 
NUM_REGIONS_TO_CREATE);
@@ -282,5 +294,9 @@ public class TestRollingRestart {
   }
 
 
+  @Parameterized.Parameters
+  public static Collection coordinatedByZK() {
+    return Arrays.asList(false, true);
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/f02ac310/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitWALManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitWALManager.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitWALManager.java
new file mode 100644
index 0000000..9e127c0
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitWALManager.java
@@ -0,0 +1,383 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import static 
org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK;
+import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER;
+import static 
org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface.ServerOperationType.SPLIT_WAL;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
+import org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
+import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
+import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
+import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
+import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
+import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.JVMClusterUtil;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
+
+@Category({ MasterTests.class, MediumTests.class })
+
+public class TestSplitWALManager {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestSplitWALManager.class);
+
+  private static HBaseTestingUtility TEST_UTIL;
+  private HMaster master;
+  private SplitWALManager splitWALManager;
+  private TableName TABLE_NAME;
+  private byte[] FAMILY;
+
+  @Before
+  public void setup() throws Exception {
+    TEST_UTIL = new HBaseTestingUtility();
+    TEST_UTIL.getConfiguration().setBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, 
false);
+    TEST_UTIL.getConfiguration().setInt(HBASE_SPLIT_WAL_MAX_SPLITTER, 1);
+    TEST_UTIL.startMiniCluster(3);
+    master = TEST_UTIL.getHBaseCluster().getMaster();
+    splitWALManager = master.getSplitWALManager();
+    TABLE_NAME = TableName.valueOf(Bytes.toBytes("TestSplitWALManager"));
+    FAMILY = Bytes.toBytes("test");
+  }
+
+  @After
+  public void teardown() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testAcquireAndRelease() throws Exception {
+    List<FakeServerProcedure> testProcedures = new ArrayList<>();
+    for (int i = 0; i < 4; i++) {
+      testProcedures.add(new FakeServerProcedure(
+          TEST_UTIL.getHBaseCluster().getServerHoldingMeta()));
+    }
+    ServerName server = 
splitWALManager.acquireSplitWALWorker(testProcedures.get(0));
+    Assert.assertNotNull(server);
+    
Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(1)));
+    
Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(2)));
+
+    Exception e = null;
+    try {
+      splitWALManager.acquireSplitWALWorker(testProcedures.get(3));
+    } catch (ProcedureSuspendedException suspendException) {
+      e = suspendException;
+    }
+    Assert.assertNotNull(e);
+    Assert.assertTrue(e instanceof ProcedureSuspendedException);
+
+    splitWALManager.releaseSplitWALWorker(server, 
TEST_UTIL.getHBaseCluster().getMaster()
+        
.getMasterProcedureExecutor().getEnvironment().getProcedureScheduler());
+    
Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(3)));
+  }
+
+  @Test
+  public void testAddNewServer() throws Exception {
+    List<FakeServerProcedure> testProcedures = new ArrayList<>();
+    for (int i = 0; i < 4; i++) {
+      testProcedures.add(new FakeServerProcedure(
+          TEST_UTIL.getHBaseCluster().getServerHoldingMeta()));
+    }
+    ServerName server = 
splitWALManager.acquireSplitWALWorker(testProcedures.get(0));
+    Assert.assertNotNull(server);
+    
Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(1)));
+    
Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(2)));
+
+    Exception e = null;
+    try {
+      splitWALManager.acquireSplitWALWorker(testProcedures.get(3));
+    } catch (ProcedureSuspendedException suspendException) {
+      e = suspendException;
+    }
+    Assert.assertNotNull(e);
+    Assert.assertTrue(e instanceof ProcedureSuspendedException);
+
+    JVMClusterUtil.RegionServerThread newServer = 
TEST_UTIL.getHBaseCluster().startRegionServer();
+    newServer.waitForServerOnline();
+    
Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(3)));
+  }
+
+  @Test
+  public void testCreateSplitWALProcedures() throws Exception {
+    TEST_UTIL.createTable(TABLE_NAME, FAMILY, 
TEST_UTIL.KEYS_FOR_HBA_CREATE_TABLE);
+    // load table
+    TEST_UTIL.loadTable(TEST_UTIL.getConnection().getTable(TABLE_NAME), 
FAMILY);
+    ProcedureExecutor<MasterProcedureEnv> masterPE = 
master.getMasterProcedureExecutor();
+    ServerName metaServer = TEST_UTIL.getHBaseCluster().getServerHoldingMeta();
+    Path metaWALDir = new Path(TEST_UTIL.getDefaultRootDirPath(),
+        AbstractFSWALProvider.getWALDirectoryName(metaServer.toString()));
+    // Test splitting meta wal
+    FileStatus[] wals =
+        TEST_UTIL.getTestFileSystem().listStatus(metaWALDir, 
MasterWalManager.META_FILTER);
+    Assert.assertEquals(1, wals.length);
+    List<Procedure> testProcedures =
+        splitWALManager.createSplitWALProcedures(Lists.newArrayList(wals[0]), 
metaServer);
+    Assert.assertEquals(1, testProcedures.size());
+    ProcedureTestingUtility.submitAndWait(masterPE, testProcedures.get(0));
+    
Assert.assertFalse(TEST_UTIL.getTestFileSystem().exists(wals[0].getPath()));
+
+    // Test splitting wal
+    wals = TEST_UTIL.getTestFileSystem().listStatus(metaWALDir, 
MasterWalManager.NON_META_FILTER);
+    Assert.assertEquals(1, wals.length);
+    testProcedures =
+        splitWALManager.createSplitWALProcedures(Lists.newArrayList(wals[0]), 
metaServer);
+    Assert.assertEquals(1, testProcedures.size());
+    ProcedureTestingUtility.submitAndWait(masterPE, testProcedures.get(0));
+    
Assert.assertFalse(TEST_UTIL.getTestFileSystem().exists(wals[0].getPath()));
+  }
+
+  @Test
+  public void testAcquireAndReleaseSplitWALWorker() throws Exception {
+    ProcedureExecutor<MasterProcedureEnv> masterPE = 
master.getMasterProcedureExecutor();
+    List<FakeServerProcedure> testProcedures = new ArrayList<>();
+    for (int i = 0; i < 3; i++) {
+      FakeServerProcedure procedure =
+          new 
FakeServerProcedure(TEST_UTIL.getHBaseCluster().getRegionServer(i).getServerName());
+      testProcedures.add(procedure);
+      ProcedureTestingUtility.submitProcedure(masterPE, procedure, 
HConstants.NO_NONCE,
+          HConstants.NO_NONCE);
+    }
+    TEST_UTIL.waitFor(10000, () -> testProcedures.get(2).isWorkerAcquired());
+    FakeServerProcedure failedProcedure =
+        new 
FakeServerProcedure(TEST_UTIL.getHBaseCluster().getServerHoldingMeta());
+    ProcedureTestingUtility.submitProcedure(masterPE, failedProcedure, 
HConstants.NO_NONCE,
+        HConstants.NO_NONCE);
+    TEST_UTIL.waitFor(20000, () -> failedProcedure.isTriedToAcquire());
+    Assert.assertFalse(failedProcedure.isWorkerAcquired());
+    // let one procedure finish and release worker
+    testProcedures.get(0).countDown();
+    TEST_UTIL.waitFor(10000, () -> failedProcedure.isWorkerAcquired());
+    Assert.assertTrue(testProcedures.get(0).isSuccess());
+  }
+
+  @Test
+  public void testGetWALsToSplit() throws Exception {
+    TEST_UTIL.createTable(TABLE_NAME, FAMILY, 
TEST_UTIL.KEYS_FOR_HBA_CREATE_TABLE);
+    // load table
+    TEST_UTIL.loadTable(TEST_UTIL.getConnection().getTable(TABLE_NAME), 
FAMILY);
+    ServerName metaServer = TEST_UTIL.getHBaseCluster().getServerHoldingMeta();
+    List<FileStatus> metaWals = splitWALManager.getWALsToSplit(metaServer, 
true);
+    Assert.assertEquals(1, metaWals.size());
+    List<FileStatus> wals = splitWALManager.getWALsToSplit(metaServer, false);
+    Assert.assertEquals(1, wals.size());
+    ServerName testServer = 
TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream()
+        .map(rs -> rs.getRegionServer().getServerName()).filter(rs -> rs != 
metaServer).findAny()
+        .get();
+    metaWals = splitWALManager.getWALsToSplit(testServer, true);
+    Assert.assertEquals(0, metaWals.size());
+  }
+
+  @Test
+  public void testSplitLogs() throws Exception {
+    TEST_UTIL.createTable(TABLE_NAME, FAMILY, 
TEST_UTIL.KEYS_FOR_HBA_CREATE_TABLE);
+    // load table
+    TEST_UTIL.loadTable(TEST_UTIL.getConnection().getTable(TABLE_NAME), 
FAMILY);
+    ProcedureExecutor<MasterProcedureEnv> masterPE = 
master.getMasterProcedureExecutor();
+    ServerName metaServer = TEST_UTIL.getHBaseCluster().getServerHoldingMeta();
+    ServerName testServer = 
TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream()
+        .map(rs -> rs.getRegionServer().getServerName()).filter(rs -> rs != 
metaServer).findAny()
+        .get();
+    List<Procedure> procedures = splitWALManager.splitWALs(testServer, false);
+    Assert.assertEquals(1, procedures.size());
+    ProcedureTestingUtility.submitAndWait(masterPE, procedures.get(0));
+    Assert.assertEquals(0, splitWALManager.getWALsToSplit(testServer, 
false).size());
+
+    procedures = splitWALManager.splitWALs(metaServer, true);
+    Assert.assertEquals(1, procedures.size());
+    ProcedureTestingUtility.submitAndWait(masterPE, procedures.get(0));
+    Assert.assertEquals(0, splitWALManager.getWALsToSplit(metaServer, 
true).size());
+    Assert.assertEquals(1, splitWALManager.getWALsToSplit(metaServer, 
false).size());
+  }
+
+  @Test
+  public void testWorkerReloadWhenMasterRestart() throws Exception {
+    List<FakeServerProcedure> testProcedures = new ArrayList<>();
+    for (int i = 0; i < 3; i++) {
+      FakeServerProcedure procedure =
+          new 
FakeServerProcedure(TEST_UTIL.getHBaseCluster().getRegionServer(i).getServerName());
+      testProcedures.add(procedure);
+      
ProcedureTestingUtility.submitProcedure(master.getMasterProcedureExecutor(), 
procedure,
+        HConstants.NO_NONCE, HConstants.NO_NONCE);
+    }
+    TEST_UTIL.waitFor(10000, () -> testProcedures.get(2).isWorkerAcquired());
+    // Kill master
+    TEST_UTIL.getHBaseCluster().killMaster(master.getServerName());
+    TEST_UTIL.getHBaseCluster().waitForMasterToStop(master.getServerName(), 
20000);
+    // restart master
+    TEST_UTIL.getHBaseCluster().startMaster();
+    TEST_UTIL.getHBaseCluster().waitForActiveAndReadyMaster();
+    this.master = TEST_UTIL.getHBaseCluster().getMaster();
+
+    FakeServerProcedure failedProcedure =
+        new 
FakeServerProcedure(TEST_UTIL.getHBaseCluster().getServerHoldingMeta());
+    
ProcedureTestingUtility.submitProcedure(master.getMasterProcedureExecutor(), 
failedProcedure,
+      HConstants.NO_NONCE, HConstants.NO_NONCE);
+    TEST_UTIL.waitFor(20000, () -> failedProcedure.isTriedToAcquire());
+    Assert.assertFalse(failedProcedure.isWorkerAcquired());
+    for (int i = 0; i < 3; i++) {
+      testProcedures.get(i).countDown();
+    }
+    failedProcedure.countDown();
+  }
+
+  public static final class FakeServerProcedure
+      extends StateMachineProcedure<MasterProcedureEnv, 
MasterProcedureProtos.SplitWALState>
+      implements ServerProcedureInterface {
+
+    private ServerName serverName;
+    private ServerName worker;
+    private CountDownLatch barrier = new CountDownLatch(1);
+    private boolean triedToAcquire = false;
+
+    public FakeServerProcedure() {
+    }
+
+    public FakeServerProcedure(ServerName serverName) {
+      this.serverName = serverName;
+    }
+
+    public ServerName getServerName() {
+      return serverName;
+    }
+
+    @Override
+    public boolean hasMetaTableRegion() {
+      return false;
+    }
+
+    @Override
+    public ServerOperationType getServerOperationType() {
+      return SPLIT_WAL;
+    }
+
+    @Override
+    protected Flow executeFromState(MasterProcedureEnv env,
+                                    MasterProcedureProtos.SplitWALState state)
+        throws ProcedureSuspendedException, ProcedureYieldException, 
InterruptedException {
+      SplitWALManager splitWALManager = 
env.getMasterServices().getSplitWALManager();
+      switch (state) {
+        case ACQUIRE_SPLIT_WAL_WORKER:
+          triedToAcquire = true;
+          worker = splitWALManager.acquireSplitWALWorker(this);
+          
setNextState(MasterProcedureProtos.SplitWALState.DISPATCH_WAL_TO_WORKER);
+          return Flow.HAS_MORE_STATE;
+        case DISPATCH_WAL_TO_WORKER:
+          barrier.await();
+          
setNextState(MasterProcedureProtos.SplitWALState.RELEASE_SPLIT_WORKER);
+          return Flow.HAS_MORE_STATE;
+        case RELEASE_SPLIT_WORKER:
+          splitWALManager.releaseSplitWALWorker(worker, 
env.getProcedureScheduler());
+          return Flow.NO_MORE_STATE;
+        default:
+          throw new UnsupportedOperationException("unhandled state=" + state);
+      }
+    }
+
+    public boolean isWorkerAcquired() {
+      return worker != null;
+    }
+
+    public boolean isTriedToAcquire() {
+      return triedToAcquire;
+    }
+
+    public void countDown() {
+      this.barrier.countDown();
+    }
+
+    @Override
+    protected void rollbackState(MasterProcedureEnv env, 
MasterProcedureProtos.SplitWALState state)
+        throws IOException, InterruptedException {
+
+    }
+
+    @Override
+    protected MasterProcedureProtos.SplitWALState getState(int stateId) {
+      return MasterProcedureProtos.SplitWALState.forNumber(stateId);
+    }
+
+    @Override
+    protected int getStateId(MasterProcedureProtos.SplitWALState state) {
+      return state.getNumber();
+    }
+
+    @Override
+    protected MasterProcedureProtos.SplitWALState getInitialState() {
+      return MasterProcedureProtos.SplitWALState.ACQUIRE_SPLIT_WAL_WORKER;
+    }
+
+    @Override
+    protected boolean holdLock(MasterProcedureEnv env) {
+      return true;
+    }
+
+    @Override
+    protected void rollback(MasterProcedureEnv env) throws IOException, 
InterruptedException {
+
+    }
+
+    @Override
+    protected boolean abort(MasterProcedureEnv env) {
+      return false;
+    }
+
+    @Override
+    protected void serializeStateData(ProcedureStateSerializer serializer) 
throws IOException {
+      MasterProcedureProtos.SplitWALData.Builder builder =
+          MasterProcedureProtos.SplitWALData.newBuilder();
+      
builder.setWalPath("test").setCrashedServer(ProtobufUtil.toServerName(serverName));
+      serializer.serialize(builder.build());
+    }
+
+    @Override
+    protected void deserializeStateData(ProcedureStateSerializer serializer) 
throws IOException {
+      MasterProcedureProtos.SplitWALData data =
+          serializer.deserialize(MasterProcedureProtos.SplitWALData.class);
+      serverName = ProtobufUtil.toServerName(data.getCrashedServer());
+    }
+  }
+}

Reply via email to