Repository: hbase
Updated Branches:
  refs/heads/master 9e824cda1 -> 55cecc967


http://git-wip-us.apache.org/repos/asf/hbase/blob/55cecc96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/HLogSplitterHandler.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/HLogSplitterHandler.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/HLogSplitterHandler.java
index 4ac800e..9bfdeed 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/HLogSplitterHandler.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/HLogSplitterHandler.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.SplitLogTask;
 import org.apache.hadoop.hbase.executor.EventHandler;
 import org.apache.hadoop.hbase.executor.EventType;
 import org.apache.hadoop.hbase.master.SplitLogManager;
+import 
org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
 import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor;
 import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor.Status;
 import org.apache.hadoop.hbase.util.CancelableProgressable;
@@ -55,11 +56,12 @@ public class HLogSplitterHandler extends EventHandler {
   private final AtomicInteger inProgressTasks;
   private final MutableInt curTaskZKVersion;
   private final TaskExecutor splitTaskExecutor;
+  private final RecoveryMode mode;
 
   public HLogSplitterHandler(final Server server, String curTask,
       final MutableInt curTaskZKVersion,
       CancelableProgressable reporter,
-      AtomicInteger inProgressTasks, TaskExecutor splitTaskExecutor) {
+      AtomicInteger inProgressTasks, TaskExecutor splitTaskExecutor, 
RecoveryMode mode) {
          super(server, EventType.RS_LOG_REPLAY);
     this.curTask = curTask;
     this.wal = ZKSplitLog.getFileName(curTask);
@@ -70,16 +72,17 @@ public class HLogSplitterHandler extends EventHandler {
     this.zkw = server.getZooKeeper();
     this.curTaskZKVersion = curTaskZKVersion;
     this.splitTaskExecutor = splitTaskExecutor;
+    this.mode = mode;
   }
 
   @Override
   public void process() throws IOException {
     long startTime = System.currentTimeMillis();
     try {
-      Status status = this.splitTaskExecutor.exec(wal, reporter);
+      Status status = this.splitTaskExecutor.exec(wal, mode, reporter);
       switch (status) {
       case DONE:
-        endTask(zkw, new SplitLogTask.Done(this.serverName),
+        endTask(zkw, new SplitLogTask.Done(this.serverName, this.mode),
           SplitLogCounters.tot_wkr_task_done, curTask, 
curTaskZKVersion.intValue());
         break;
       case PREEMPTED:
@@ -88,7 +91,7 @@ public class HLogSplitterHandler extends EventHandler {
         break;
       case ERR:
         if (server != null && !server.isStopped()) {
-          endTask(zkw, new SplitLogTask.Err(this.serverName),
+          endTask(zkw, new SplitLogTask.Err(this.serverName, this.mode),
             SplitLogCounters.tot_wkr_task_err, curTask, 
curTaskZKVersion.intValue());
           break;
         }
@@ -99,7 +102,7 @@ public class HLogSplitterHandler extends EventHandler {
         if (server != null && server.isStopped()) {
           LOG.info("task execution interrupted because worker is exiting " + 
curTask);
         }
-        endTask(zkw, new SplitLogTask.Resigned(this.serverName),
+        endTask(zkw, new SplitLogTask.Resigned(this.serverName, this.mode),
           SplitLogCounters.tot_wkr_task_resigned, curTask, 
curTaskZKVersion.intValue());
         break;
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/55cecc96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
index b9f82b4..0ce4a64 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
@@ -94,6 +94,7 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey;
 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
 import 
org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionStoreSequenceIds;
+import 
org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
 import 
org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.StoreSequenceId;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.LastSequenceId;
@@ -172,7 +173,7 @@ public class HLogSplitter {
 
   HLogSplitter(Configuration conf, Path rootDir,
       FileSystem fs, LastSequenceId idChecker, ZooKeeperWatcher zkw,
-      CoordinatedStateManager csm) {
+      CoordinatedStateManager csm, RecoveryMode mode) {
     this.conf = HBaseConfiguration.create(conf);
     String codecClassName = conf
         .get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, 
WALCellCodec.class.getName());
@@ -190,7 +191,7 @@ public class HLogSplitter {
     // a larger minBatchSize may slow down recovery because replay writer has 
to wait for
     // enough edits before replaying them
     this.minBatchSize = 
this.conf.getInt("hbase.regionserver.wal.logreplay.batch.size", 64);
-    this.distributedLogReplay = HLogSplitter.isDistributedLogReplay(this.conf);
+    this.distributedLogReplay = (RecoveryMode.LOG_REPLAY == mode);
 
     this.numWriterThreads = 
this.conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3);
     if (zkw != null && csm != null && this.distributedLogReplay) {
@@ -224,9 +225,8 @@ public class HLogSplitter {
    */
   public static boolean splitLogFile(Path rootDir, FileStatus logfile, 
FileSystem fs,
       Configuration conf, CancelableProgressable reporter, LastSequenceId 
idChecker,
-      ZooKeeperWatcher zkw, CoordinatedStateManager cp) throws IOException {
-    HLogSplitter s = new HLogSplitter(conf, rootDir, fs, idChecker, zkw,
-      cp);
+      ZooKeeperWatcher zkw, CoordinatedStateManager cp, RecoveryMode mode) 
throws IOException {
+    HLogSplitter s = new HLogSplitter(conf, rootDir, fs, idChecker, zkw, cp, 
mode);
     return s.splitLogFile(logfile, reporter);
   }
 
@@ -240,7 +240,8 @@ public class HLogSplitter {
     List<Path> splits = new ArrayList<Path>();
     if (logfiles != null && logfiles.length > 0) {
       for (FileStatus logfile: logfiles) {
-        HLogSplitter s = new HLogSplitter(conf, rootDir, fs, null, null, null);
+        HLogSplitter s = new HLogSplitter(conf, rootDir, fs, null, null, null, 
+          RecoveryMode.LOG_SPLITTING);
         if (s.splitLogFile(logfile, null)) {
           finishSplitLogFile(rootDir, oldLogDir, logfile.getPath(), conf);
           if (s.outputSink.splits != null) {
@@ -1980,20 +1981,4 @@ public class HLogSplitter {
 
     return mutations;
   }
-
-  /**
-   * Returns if distributed log replay is turned on or not
-   * @param conf
-   * @return true when distributed log replay is turned on
-   */
-  public static boolean isDistributedLogReplay(Configuration conf) {
-    boolean dlr = conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY,
-      HConstants.DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG);
-    int version = conf.getInt(HFile.FORMAT_VERSION_KEY, 
HFile.MAX_FORMAT_VERSION);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Distributed log replay=" + dlr + ", " + 
HFile.FORMAT_VERSION_KEY + "=" + version);
-    }
-    // For distributed log replay, hfile version must be 3 at least; we need 
tag support.
-    return dlr && (version >= 3);
-  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/55cecc96/hbase-server/src/test/java/org/apache/hadoop/hbase/TestSerialization.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestSerialization.java 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestSerialization.java
index d7082ed..b46ec28 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestSerialization.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestSerialization.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.filter.RowFilter;
 import org.apache.hadoop.hbase.io.TimeRange;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+import 
org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Writables;
 import org.apache.hadoop.io.DataInputBuffer;
@@ -120,7 +121,8 @@ public class TestSerialization {
 
   @Test
   public void testSplitLogTask() throws DeserializationException {
-    SplitLogTask slt = new 
SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1"));
+    SplitLogTask slt = new 
SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1"), 
+      RecoveryMode.LOG_REPLAY);
     byte [] bytes = slt.toByteArray();
     SplitLogTask sltDeserialized = SplitLogTask.parseFrom(bytes);
     assertTrue(slt.equals(sltDeserialized));

http://git-wip-us.apache.org/repos/asf/hbase/blob/55cecc96/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java
index 2e74281..9fc0565 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java
@@ -68,7 +68,7 @@ public class TestMultiParallel {
   private static final byte[] ONE_ROW = Bytes.toBytes("xxx");
   private static final byte [][] KEYS = makeKeys();
 
-  private static final int slaves = 3; // also used for testing HTable pool 
size
+  private static final int slaves = 5; // also used for testing HTable pool 
size
 
   @BeforeClass public static void beforeClass() throws Exception {
     ((Log4JLogger)RpcServer.LOG).getLogger().setLevel(Level.ALL);
@@ -692,4 +692,4 @@ public class TestMultiParallel {
       validateEmpty(result);
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/55cecc96/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java
index f5e8952..eabb813 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java
@@ -75,6 +75,7 @@ import 
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.Table;
+import 
org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
 import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -660,10 +661,14 @@ public class TestAssignmentManager {
     DeadServer deadServers = new DeadServer();
     deadServers.add(SERVERNAME_A);
     // I need a services instance that will return the AM
+    MasterFileSystem fs = Mockito.mock(MasterFileSystem.class);
+    Mockito.doNothing().when(fs).setLogRecoveryMode();
+    Mockito.when(fs.getLogRecoveryMode()).thenReturn(RecoveryMode.LOG_REPLAY);
     MasterServices services = Mockito.mock(MasterServices.class);
     Mockito.when(services.getAssignmentManager()).thenReturn(am);
     Mockito.when(services.getServerManager()).thenReturn(this.serverManager);
     Mockito.when(services.getZooKeeper()).thenReturn(this.watcher);
+    Mockito.when(services.getMasterFileSystem()).thenReturn(fs);
     ServerShutdownHandler handler = new ServerShutdownHandler(this.server,
       services, deadServers, SERVERNAME_A, false);
     am.failoverCleanupDone.set(true);

http://git-wip-us.apache.org/repos/asf/hbase/blob/55cecc96/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFileSystem.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFileSystem.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFileSystem.java
index 2fbb849..8f57ee4 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFileSystem.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFileSystem.java
@@ -94,8 +94,8 @@ public class TestMasterFileSystem {
     // Create a ZKW to use in the test
     ZooKeeperWatcher zkw = HBaseTestingUtility.getZooKeeperWatcher(UTIL);
     zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, 
walPath),
-      new SplitLogTask.Owned(inRecoveryServerName).toByteArray(), 
Ids.OPEN_ACL_UNSAFE,
-      CreateMode.PERSISTENT);
+      new SplitLogTask.Owned(inRecoveryServerName, 
fs.getLogRecoveryMode()).toByteArray(), 
+        Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
     String staleRegionPath = ZKUtil.joinZNode(zkw.recoveringRegionsZNode, 
staleRegion);
     ZKUtil.createWithParents(zkw, staleRegionPath);
     String inRecoveringRegionPath = 
ZKUtil.joinZNode(zkw.recoveringRegionsZNode, failedRegion);

http://git-wip-us.apache.org/repos/asf/hbase/blob/55cecc96/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java
index ed51484..ceb6ada 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java
@@ -48,16 +48,21 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.MediumTests;
 import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.SplitLogCounters;
 import org.apache.hadoop.hbase.SplitLogTask;
 import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.master.SplitLogManager.Task;
 import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch;
+import 
org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.regionserver.SplitLogWorker;
 import 
org.apache.hadoop.hbase.regionserver.TestMasterAddressTracker.NodeCreationListener;
 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
@@ -90,6 +95,7 @@ public class TestSplitLogManager {
   private SplitLogManager slm;
   private Configuration conf;
   private int to;
+  private RecoveryMode mode;
 
   private static HBaseTestingUtility TEST_UTIL;
 
@@ -134,6 +140,9 @@ public class TestSplitLogManager {
     conf.setInt("hbase.splitlog.manager.unassigned.timeout", 2 * to);
     conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100);
     to = to + 4 * 100;
+    
+    this.mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) 
? 
+        RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING);
   }
 
   @After
@@ -213,7 +222,7 @@ public class TestSplitLogManager {
     LOG.info("TestOrphanTaskAcquisition");
 
     String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash");
-    SplitLogTask slt = new SplitLogTask.Owned(DUMMY_MASTER);
+    SplitLogTask slt = new SplitLogTask.Owned(DUMMY_MASTER, this.mode);
     zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), 
Ids.OPEN_ACL_UNSAFE,
         CreateMode.PERSISTENT);
 
@@ -238,7 +247,7 @@ public class TestSplitLogManager {
         " startup");
     String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash");
     //create an unassigned orphan task
-    SplitLogTask slt = new SplitLogTask.Unassigned(DUMMY_MASTER);
+    SplitLogTask slt = new SplitLogTask.Unassigned(DUMMY_MASTER, this.mode);
     zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), 
Ids.OPEN_ACL_UNSAFE,
         CreateMode.PERSISTENT);
     int version = ZKUtil.checkExists(zkw, tasknode);
@@ -274,19 +283,19 @@ public class TestSplitLogManager {
     final ServerName worker1 = ServerName.valueOf("worker1,1,1");
     final ServerName worker2 = ServerName.valueOf("worker2,1,1");
     final ServerName worker3 = ServerName.valueOf("worker3,1,1");
-    SplitLogTask slt = new SplitLogTask.Owned(worker1);
+    SplitLogTask slt = new SplitLogTask.Owned(worker1, this.mode);
     ZKUtil.setData(zkw, tasknode, slt.toByteArray());
     waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
     waitForCounter(tot_mgr_resubmit, 0, 1, to + to/2);
     int version1 = ZKUtil.checkExists(zkw, tasknode);
     assertTrue(version1 > version);
-    slt = new SplitLogTask.Owned(worker2);
+    slt = new SplitLogTask.Owned(worker2, this.mode);
     ZKUtil.setData(zkw, tasknode, slt.toByteArray());
     waitForCounter(tot_mgr_heartbeat, 1, 2, to/2);
     waitForCounter(tot_mgr_resubmit, 1, 2, to + to/2);
     int version2 = ZKUtil.checkExists(zkw, tasknode);
     assertTrue(version2 > version1);
-    slt = new SplitLogTask.Owned(worker3);
+    slt = new SplitLogTask.Owned(worker3, this.mode);
     ZKUtil.setData(zkw, tasknode, slt.toByteArray());
     waitForCounter(tot_mgr_heartbeat, 2, 3, to/2);
     waitForCounter(tot_mgr_resubmit_threshold_reached, 0, 1, to + to/2);
@@ -304,7 +313,7 @@ public class TestSplitLogManager {
     String tasknode = submitTaskAndWait(batch, "foo/1");
     int version = ZKUtil.checkExists(zkw, tasknode);
     final ServerName worker1 = ServerName.valueOf("worker1,1,1");
-    SplitLogTask slt = new SplitLogTask.Owned(worker1);
+    SplitLogTask slt = new SplitLogTask.Owned(worker1, this.mode);
     ZKUtil.setData(zkw, tasknode, slt.toByteArray());
     waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
     waitForCounter(new Expr() {
@@ -331,7 +340,7 @@ public class TestSplitLogManager {
     TaskBatch batch = new TaskBatch();
     String tasknode = submitTaskAndWait(batch, "foo/1");
     final ServerName worker1 = ServerName.valueOf("worker1,1,1");
-    SplitLogTask slt = new SplitLogTask.Done(worker1);
+    SplitLogTask slt = new SplitLogTask.Done(worker1, this.mode);
     ZKUtil.setData(zkw, tasknode, slt.toByteArray());
     synchronized (batch) {
       while (batch.installed != batch.done) {
@@ -352,7 +361,7 @@ public class TestSplitLogManager {
 
     String tasknode = submitTaskAndWait(batch, "foo/1");
     final ServerName worker1 = ServerName.valueOf("worker1,1,1");
-    SplitLogTask slt = new SplitLogTask.Err(worker1);
+    SplitLogTask slt = new SplitLogTask.Err(worker1, this.mode);
     ZKUtil.setData(zkw, tasknode, slt.toByteArray());
 
     synchronized (batch) {
@@ -376,7 +385,7 @@ public class TestSplitLogManager {
     assertEquals(tot_mgr_resubmit.get(), 0);
     final ServerName worker1 = ServerName.valueOf("worker1,1,1");
     assertEquals(tot_mgr_resubmit.get(), 0);
-    SplitLogTask slt = new SplitLogTask.Resigned(worker1);
+    SplitLogTask slt = new SplitLogTask.Resigned(worker1, this.mode);
     assertEquals(tot_mgr_resubmit.get(), 0);
     ZKUtil.setData(zkw, tasknode, slt.toByteArray());
     int version = ZKUtil.checkExists(zkw, tasknode);
@@ -399,7 +408,7 @@ public class TestSplitLogManager {
     // create an orphan task in OWNED state
     String tasknode1 = ZKSplitLog.getEncodedNodeName(zkw, "orphan/1");
     final ServerName worker1 = ServerName.valueOf("worker1,1,1");
-    SplitLogTask slt = new SplitLogTask.Owned(worker1);
+    SplitLogTask slt = new SplitLogTask.Owned(worker1, this.mode);
     zkw.getRecoverableZooKeeper().create(tasknode1, slt.toByteArray(), 
Ids.OPEN_ACL_UNSAFE,
         CreateMode.PERSISTENT);
 
@@ -414,7 +423,7 @@ public class TestSplitLogManager {
     for (int i = 0; i < (3 * to)/100; i++) {
       Thread.sleep(100);
       final ServerName worker2 = ServerName.valueOf("worker1,1,1");
-      slt = new SplitLogTask.Owned(worker2);
+      slt = new SplitLogTask.Owned(worker2, this.mode);
       ZKUtil.setData(zkw, tasknode1, slt.toByteArray());
     }
 
@@ -438,7 +447,7 @@ public class TestSplitLogManager {
     String tasknode = submitTaskAndWait(batch, "foo/1");
     int version = ZKUtil.checkExists(zkw, tasknode);
     final ServerName worker1 = ServerName.valueOf("worker1,1,1");
-    SplitLogTask slt = new SplitLogTask.Owned(worker1);
+    SplitLogTask slt = new SplitLogTask.Owned(worker1, this.mode);
     ZKUtil.setData(zkw, tasknode, slt.toByteArray());
     if (tot_mgr_heartbeat.get() == 0) waitForCounter(tot_mgr_heartbeat, 0, 1, 
to/2);
     slm.handleDeadWorker(worker1);
@@ -463,7 +472,7 @@ public class TestSplitLogManager {
     String tasknode = submitTaskAndWait(batch, "foo/1");
     final ServerName worker1 = ServerName.valueOf("worker1,1,1");
 
-    SplitLogTask slt = new SplitLogTask.Owned(worker1);
+    SplitLogTask slt = new SplitLogTask.Owned(worker1, this.mode);
     ZKUtil.setData(zkw, tasknode, slt.toByteArray());
     if (tot_mgr_heartbeat.get() == 0) waitForCounter(tot_mgr_heartbeat, 0, 1, 
to/2);
 
@@ -513,4 +522,25 @@ public class TestSplitLogManager {
 
     assertTrue("Recovery regions isn't cleaned", recoveringRegions.isEmpty());
   }
+  
+  @Test(timeout=60000)
+  public void testGetPreviousRecoveryMode() throws Exception {
+    LOG.info("testGetPreviousRecoveryMode");
+    SplitLogCounters.resetCounters();
+    Configuration testConf = 
HBaseConfiguration.create(TEST_UTIL.getConfiguration());
+    testConf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
+
+    zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, 
"testRecovery"),
+      new SplitLogTask.Unassigned(
+        ServerName.valueOf("mgr,1,1"), 
RecoveryMode.LOG_SPLITTING).toByteArray(),
+        Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+    slm = new SplitLogManager(zkw, testConf, stopper, master, DUMMY_MASTER);
+    assertTrue(slm.getRecoveryMode() == RecoveryMode.LOG_SPLITTING);
+    
+    zkw.getRecoverableZooKeeper().delete(ZKSplitLog.getEncodedNodeName(zkw, 
"testRecovery"), -1);
+    slm.setRecoveryMode(false);
+    assertTrue(slm.getRecoveryMode() == RecoveryMode.LOG_REPLAY);
+  }
+  
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/55cecc96/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java
index 2d94a77..10f2957 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerNoMaster.java
@@ -132,7 +132,7 @@ public class TestRegionServerNoMaster {
     ZKAssign.createNodeOffline(HTU.getZooKeeperWatcher(), hri, 
getRS().getServerName());
     // first version is '0'
     AdminProtos.OpenRegionRequest orr =
-      RequestConverter.buildOpenRegionRequest(getRS().getServerName(), hri, 0, 
null);
+      RequestConverter.buildOpenRegionRequest(getRS().getServerName(), hri, 0, 
null, null);
     AdminProtos.OpenRegionResponse responseOpen = 
getRS().rpcServices.openRegion(null, orr);
     Assert.assertTrue(responseOpen.getOpeningStateCount() == 1);
     Assert.assertTrue(responseOpen.getOpeningState(0).
@@ -251,7 +251,8 @@ public class TestRegionServerNoMaster {
 
     // We're sending multiple requests in a row. The region server must handle 
this nicely.
     for (int i = 0; i < 10; i++) {
-      AdminProtos.OpenRegionRequest orr = 
RequestConverter.buildOpenRegionRequest(getRS().getServerName(), hri, 0, null);
+      AdminProtos.OpenRegionRequest orr = 
RequestConverter.buildOpenRegionRequest(
+        getRS().getServerName(), hri, 0, null, null);
       AdminProtos.OpenRegionResponse responseOpen = 
getRS().rpcServices.openRegion(null, orr);
       Assert.assertTrue(responseOpen.getOpeningStateCount() == 1);
 
@@ -277,7 +278,7 @@ public class TestRegionServerNoMaster {
       // fake region to be closing now, need to clear state afterwards
       getRS().regionsInTransitionInRS.put(hri.getEncodedNameAsBytes(), 
Boolean.FALSE);
       AdminProtos.OpenRegionRequest orr =
-        RequestConverter.buildOpenRegionRequest(sn, hri, 0, null);
+        RequestConverter.buildOpenRegionRequest(sn, hri, 0, null, null);
       getRS().rpcServices.openRegion(null, orr);
       Assert.fail("The closing region should not be opened");
     } catch (ServiceException se) {
@@ -454,7 +455,8 @@ public class TestRegionServerNoMaster {
     //actual close
     closeNoZK();
     try {
-      AdminProtos.OpenRegionRequest orr = 
RequestConverter.buildOpenRegionRequest(earlierServerName, hri, 0, null);
+      AdminProtos.OpenRegionRequest orr = 
RequestConverter.buildOpenRegionRequest(
+        earlierServerName, hri, 0, null, null);
       getRS().getRSRpcServices().openRegion(null, orr);
       Assert.fail("The openRegion should have been rejected");
     } catch (ServiceException se) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/55cecc96/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java
index eaf5547..dcb1e88 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -31,6 +32,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.MediumTests;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.SplitLogCounters;
@@ -38,6 +40,8 @@ import org.apache.hadoop.hbase.SplitLogTask;
 import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.executor.ExecutorService;
 import org.apache.hadoop.hbase.executor.ExecutorType;
+import 
org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
+import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
 import org.apache.hadoop.hbase.util.CancelableProgressable;
 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
@@ -64,6 +68,7 @@ public class TestSplitLogWorker {
   private ZooKeeperWatcher zkw;
   private SplitLogWorker slw;
   private ExecutorService executorService;
+  private RecoveryMode mode;
 
   private void waitForCounter(AtomicLong ctr, long oldval, long newval, long 
timems)
       throws Exception {
@@ -98,6 +103,7 @@ public class TestSplitLogWorker {
   @Before
   public void setup() throws Exception {
     TEST_UTIL.startMiniZKCluster();
+    Configuration conf = TEST_UTIL.getConfiguration();
     zkw = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(),
         "split-log-worker-tests", null);
     ZKUtil.deleteChildrenRecursively(zkw, zkw.baseZNode);
@@ -112,6 +118,8 @@ public class TestSplitLogWorker {
     SplitLogCounters.resetCounters();
     executorService = new ExecutorService("TestSplitLogWorker");
     executorService.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, 10);
+    this.mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) 
? 
+        RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING);
   }
 
   @After
@@ -126,7 +134,7 @@ public class TestSplitLogWorker {
     new SplitLogWorker.TaskExecutor() {
 
       @Override
-      public Status exec(String name, CancelableProgressable p) {
+      public Status exec(String name, RecoveryMode mode, 
CancelableProgressable p) {
         while (true) {
           try {
             Thread.sleep(1000);
@@ -149,7 +157,8 @@ public class TestSplitLogWorker {
     final ServerName RS = ServerName.valueOf("rs,1,1");
     RegionServerServices mockedRS = getRegionServer(RS);
     zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, 
TATAS),
-      new 
SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1")).toByteArray(), 
Ids.OPEN_ACL_UNSAFE,
+      new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1"), 
this.mode).toByteArray(), 
+        Ids.OPEN_ACL_UNSAFE,
         CreateMode.PERSISTENT);
 
     SplitLogWorker slw =
@@ -184,8 +193,8 @@ public class TestSplitLogWorker {
     final ServerName SVR1 = ServerName.valueOf("svr1,1,1");
     final ServerName SVR2 = ServerName.valueOf("svr2,1,1");
     zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, 
TRFT),
-      new SplitLogTask.Unassigned(MANAGER).toByteArray(), Ids.OPEN_ACL_UNSAFE,
-        CreateMode.PERSISTENT);
+      new SplitLogTask.Unassigned(MANAGER, this.mode).toByteArray(), 
+        Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
     RegionServerServices mockedRS1 = getRegionServer(SVR1);
     RegionServerServices mockedRS2 = getRegionServer(SVR2);
     SplitLogWorker slw1 =
@@ -227,15 +236,15 @@ public class TestSplitLogWorker {
 
       // this time create a task node after starting the splitLogWorker
       zkw.getRecoverableZooKeeper().create(PATH,
-        new SplitLogTask.Unassigned(MANAGER).toByteArray(), 
Ids.OPEN_ACL_UNSAFE,
-        CreateMode.PERSISTENT);
+        new SplitLogTask.Unassigned(MANAGER, this.mode).toByteArray(), 
+        Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
 
       waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME);
       assertEquals(1, slw.taskReadySeq);
       byte [] bytes = ZKUtil.getData(zkw, PATH);
       SplitLogTask slt = SplitLogTask.parseFrom(bytes);
       assertTrue(slt.isOwned(SRV));
-      slt = new SplitLogTask.Owned(MANAGER);
+      slt = new SplitLogTask.Owned(MANAGER, this.mode);
       ZKUtil.setData(zkw, PATH, slt.toByteArray());
       waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, WAIT_TIME);
     } finally {
@@ -258,7 +267,8 @@ public class TestSplitLogWorker {
       Thread.sleep(100);
       waitForCounter(SplitLogCounters.tot_wkr_task_grabing, 0, 1, WAIT_TIME);
 
-      SplitLogTask unassignedManager = new SplitLogTask.Unassigned(MANAGER);
+      SplitLogTask unassignedManager = 
+        new SplitLogTask.Unassigned(MANAGER, this.mode);
       zkw.getRecoverableZooKeeper().create(PATH1, 
unassignedManager.toByteArray(),
         Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
 
@@ -272,7 +282,7 @@ public class TestSplitLogWorker {
 
       // preempt the first task, have it owned by another worker
       final ServerName anotherWorker = 
ServerName.valueOf("another-worker,1,1");
-      SplitLogTask slt = new SplitLogTask.Owned(anotherWorker);
+      SplitLogTask slt = new SplitLogTask.Owned(anotherWorker, this.mode);
       ZKUtil.setData(zkw, PATH1, slt.toByteArray());
       waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, WAIT_TIME);
 
@@ -298,7 +308,7 @@ public class TestSplitLogWorker {
     Thread.sleep(100);
 
     String task = ZKSplitLog.getEncodedNodeName(zkw, "task");
-    SplitLogTask slt = new SplitLogTask.Unassigned(MANAGER);
+    SplitLogTask slt = new SplitLogTask.Unassigned(MANAGER, this.mode);
     zkw.getRecoverableZooKeeper().create(task,slt.toByteArray(), 
Ids.OPEN_ACL_UNSAFE,
       CreateMode.PERSISTENT);
 
@@ -351,8 +361,8 @@ public class TestSplitLogWorker {
 
     for (int i = 0; i < maxTasks; i++) {
       zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, 
TATAS + i),
-        new 
SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1")).toByteArray(),
-        Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1"), 
this.mode).toByteArray(),
+          Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
     }
 
     SplitLogWorker slw = new SplitLogWorker(zkw, testConf, mockedRS, 
neverEndingTask);
@@ -394,9 +404,8 @@ public class TestSplitLogWorker {
 
     for (int i = 0; i < maxTasks; i++) {
       zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, 
TATAS + i),
-        new 
SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1")).toByteArray(),
-        Ids.OPEN_ACL_UNSAFE,
-        CreateMode.PERSISTENT);
+        new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1"), 
this.mode).toByteArray(),
+          Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
     }
 
     SplitLogWorker slw = new SplitLogWorker(zkw, testConf, mockedRS, 
neverEndingTask);

http://git-wip-us.apache.org/repos/asf/hbase/blob/55cecc96/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
index f03c81d..548f4d5 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
@@ -444,6 +444,7 @@ public class TestSplitTransactionOnCluster {
       AssignmentManager.TEST_SKIP_SPLIT_HANDLING = false;
       admin.setBalancerRunning(true, false);
       cluster.getMaster().setCatalogJanitorEnabled(true);
+      cluster.startRegionServer();
       t.close();
     }
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/55cecc96/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogMethods.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogMethods.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogMethods.java
index d376c29..cdf71f6 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogMethods.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogMethods.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.*;
+import 
org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
 import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter.EntryBuffers;
 import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter.RegionEntryBuffer;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -111,8 +112,10 @@ public class TestHLogMethods {
   @Test
   public void testEntrySink() throws Exception {
     Configuration conf = new Configuration();
+    RecoveryMode mode = 
(conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ? 
+      RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING);
     HLogSplitter splitter = new HLogSplitter(
-      conf, mock(Path.class), mock(FileSystem.class), null, null, null);
+      conf, mock(Path.class), mock(FileSystem.class), null, null, null, mode);
 
     EntryBuffers sink = splitter.new EntryBuffers(1*1024*1024);
     for (int i = 0; i < 1000; i++) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/55cecc96/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
index e6495c4..dc39415 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
@@ -62,6 +62,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.LargeTests;
+import 
org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
 import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
@@ -127,6 +128,7 @@ public class TestHLogSplit {
   private static String ROBBER;
   private static String ZOMBIE;
   private static String [] GROUP = new String [] {"supergroup"};
+  private RecoveryMode mode;
 
   static enum Corruptions {
     INSERT_GARBAGE_ON_FIRST_LINE,
@@ -177,6 +179,8 @@ public class TestHLogSplit {
     REGIONS.clear();
     Collections.addAll(REGIONS, "bbb", "ccc");
     InstrumentedSequenceFileLogWriter.activateFailure = false;
+    this.mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) 
? 
+        RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING);
   }
 
   @After
@@ -805,7 +809,7 @@ public class TestHLogSplit {
       logfiles != null && logfiles.length > 0);
     // Set up a splitter that will throw an IOE on the output side
     HLogSplitter logSplitter = new HLogSplitter(
-        conf, HBASEDIR, fs, null, null, null) {
+        conf, HBASEDIR, fs, null, null, null, this.mode) {
       protected HLog.Writer createWriter(FileSystem fs,
           Path logfile, Configuration conf) throws IOException {
         HLog.Writer mockWriter = Mockito.mock(HLog.Writer.class);
@@ -938,7 +942,7 @@ public class TestHLogSplit {
     try {
       conf.setInt("hbase.splitlog.report.period", 1000);
       boolean ret = HLogSplitter.splitLogFile(
-        HBASEDIR, logfile, spiedFs, conf, localReporter, null, null, null);
+        HBASEDIR, logfile, spiedFs, conf, localReporter, null, null, null, 
this.mode);
       assertFalse("Log splitting should failed", ret);
       assertTrue(count.get() > 0);
     } catch (IOException e) {
@@ -997,7 +1001,7 @@ public class TestHLogSplit {
 
     // Create a splitter that reads and writes the data without touching disk
     HLogSplitter logSplitter = new HLogSplitter(
-        localConf, HBASEDIR, fs, null, null, null) {
+        localConf, HBASEDIR, fs, null, null, null, this.mode) {
 
       /* Produce a mock writer that doesn't write anywhere */
       protected HLog.Writer createWriter(FileSystem fs, Path logfile, 
Configuration conf)
@@ -1282,7 +1286,7 @@ public class TestHLogSplit {
       logfiles != null && logfiles.length > 0);
 
     HLogSplitter logSplitter = new HLogSplitter(
-        conf, HBASEDIR, fs, null, null, null) {
+        conf, HBASEDIR, fs, null, null, null, this.mode) {
       protected HLog.Writer createWriter(FileSystem fs, Path logfile, 
Configuration conf)
       throws IOException {
         HLog.Writer writer = HLogFactory.createRecoveredEditsWriter(fs, 
logfile, conf);

http://git-wip-us.apache.org/repos/asf/hbase/blob/55cecc96/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
index ad889c6..ea368df 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
@@ -59,6 +59,7 @@ import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import 
org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
 import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
 import org.apache.hadoop.hbase.regionserver.DefaultStoreFlusher;
 import org.apache.hadoop.hbase.regionserver.FlushRequestListener;
@@ -98,6 +99,8 @@ public class TestWALReplay {
   private Path logDir;
   private FileSystem fs;
   private Configuration conf;
+  private RecoveryMode mode;
+  
 
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
@@ -128,6 +131,8 @@ public class TestWALReplay {
     if (TEST_UTIL.getDFSCluster().getFileSystem().exists(this.hbaseRootDir)) {
       TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, 
true);
     }
+    this.mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) 
? 
+        RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING);
   }
 
   @After
@@ -873,7 +878,7 @@ public class TestWALReplay {
     wal.close();
     FileStatus[] listStatus = this.fs.listStatus(wal.getDir());
     HLogSplitter.splitLogFile(hbaseRootDir, listStatus[0],
-      this.fs, this.conf, null, null, null, null);
+      this.fs, this.conf, null, null, null, null, mode);
     FileStatus[] listStatus1 = this.fs.listStatus(
         new Path(FSUtils.getTableDir(hbaseRootDir, tableName),
             new Path(hri.getEncodedName(), "recovered.edits")));

Reply via email to