HBASE-19747 Introduce a special WALProvider for synchronous replication

Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e339f3b4
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e339f3b4
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e339f3b4

Branch: refs/heads/HBASE-19064
Commit: e339f3b44146c0e01cc68c708d79dda1fb6bb971
Parents: 0d59832
Author: zhangduo <zhang...@apache.org>
Authored: Fri Jan 19 18:38:39 2018 +0800
Committer: zhangduo <zhang...@apache.org>
Committed: Thu Feb 15 12:30:26 2018 +0800

----------------------------------------------------------------------
 .../hbase/regionserver/wal/AbstractFSWAL.java   |   7 +
 .../hbase/regionserver/wal/AsyncFSWAL.java      |   1 -
 .../hbase/regionserver/wal/DualAsyncFSWAL.java  |   4 +-
 .../hadoop/hbase/regionserver/wal/FSHLog.java   |   3 -
 .../regionserver/PeerActionListener.java        |  33 +++
 .../SynchronousReplicationPeerProvider.java     |  35 +++
 .../hadoop/hbase/wal/AbstractFSWALProvider.java |   1 +
 .../hadoop/hbase/wal/AsyncFSWALProvider.java    |  18 +-
 .../hbase/wal/NettyAsyncFSWALConfigHelper.java  |   8 +-
 .../hbase/wal/RegionGroupingProvider.java       |  13 +-
 .../wal/SynchronousReplicationWALProvider.java  | 225 +++++++++++++++++++
 .../org/apache/hadoop/hbase/wal/WALFactory.java |  37 ++-
 .../org/apache/hadoop/hbase/wal/WALKeyImpl.java |  16 +-
 .../regionserver/TestCompactionPolicy.java      |   1 +
 .../regionserver/TestFailedAppendAndSync.java   | 122 +++++-----
 .../hadoop/hbase/regionserver/TestHRegion.java  |  24 +-
 .../TestHRegionWithInMemoryFlush.java           |   7 -
 .../hbase/regionserver/TestRegionIncrement.java |  20 +-
 .../hbase/regionserver/TestWALLockup.java       |   1 +
 .../regionserver/wal/AbstractTestWALReplay.java |   1 +
 .../regionserver/wal/ProtobufLogTestHelper.java |  44 +++-
 .../hbase/regionserver/wal/TestAsyncFSWAL.java  |  13 +-
 .../regionserver/wal/TestAsyncWALReplay.java    |   4 +-
 .../wal/TestCombinedAsyncWriter.java            |   3 +-
 .../hbase/regionserver/wal/TestFSHLog.java      |  15 +-
 .../hbase/regionserver/wal/TestWALReplay.java   |   1 +
 .../apache/hadoop/hbase/wal/IOTestProvider.java |   2 -
 .../TestSynchronousReplicationWALProvider.java  | 153 +++++++++++++
 28 files changed, 659 insertions(+), 153 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/e339f3b4/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
index 14fbe10..31b1c54 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
@@ -430,6 +430,13 @@ public abstract class AbstractFSWAL<W extends WriterBase> 
implements WAL {
     this.implClassName = getClass().getSimpleName();
   }
 
+  /**
+   * Used to initialize the WAL. Usually just call rollWriter to create the 
first log writer.
+   */
+  public void init() throws IOException {
+    rollWriter();
+  }
+
   @Override
   public void registerWALActionsListener(WALActionsListener listener) {
     this.listeners.add(listener);

http://git-wip-us.apache.org/repos/asf/hbase/blob/e339f3b4/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
index 8e57441..ac72dc7 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
@@ -247,7 +247,6 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
     batchSize = conf.getLong(WAL_BATCH_SIZE, DEFAULT_WAL_BATCH_SIZE);
     waitOnShutdownInSeconds = 
conf.getInt(ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS,
       DEFAULT_ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS);
-    rollWriter();
   }
 
   private static boolean waitingRoll(int epochAndState) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/e339f3b4/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java
index 6bf9e02..f92ce93 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java
@@ -38,14 +38,14 @@ public class DualAsyncFSWAL extends AsyncFSWAL {
 
   private final Path remoteWalDir;
 
-  public DualAsyncFSWAL(FileSystem fs, FileSystem remoteFs, Path rootDir, Path 
remoteRootDir,
+  public DualAsyncFSWAL(FileSystem fs, FileSystem remoteFs, Path rootDir, Path 
remoteWalDir,
       String logDir, String archiveDir, Configuration conf, 
List<WALActionsListener> listeners,
       boolean failIfWALExists, String prefix, String suffix, EventLoopGroup 
eventLoopGroup,
       Class<? extends Channel> channelClass) throws FailedLogCloseException, 
IOException {
     super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, 
prefix, suffix,
         eventLoopGroup, channelClass);
     this.remoteFs = remoteFs;
-    this.remoteWalDir = new Path(remoteRootDir, logDir);
+    this.remoteWalDir = remoteWalDir;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/e339f3b4/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
index 2023932..b11a052 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
@@ -212,9 +212,6 @@ public class FSHLog extends AbstractFSWAL<Writer> {
       5);
     this.closeErrorsTolerated = 
conf.getInt("hbase.regionserver.logroll.errors.tolerated", 2);
 
-    // rollWriter sets this.hdfs_out if it can.
-    rollWriter();
-
     // This is the 'writer' -- a single threaded executor. This single thread 
'consumes' what is
     // put on the ring buffer.
     String hostingThreadName = Thread.currentThread().getName();

http://git-wip-us.apache.org/repos/asf/hbase/blob/e339f3b4/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerActionListener.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerActionListener.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerActionListener.java
new file mode 100644
index 0000000..74ad626
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerActionListener.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Get notification for replication peer events. Mainly used for telling the
+ * {@link org.apache.hadoop.hbase.wal.SynchronousReplicationWALProvider} to 
close some WAL if not
+ * used any more.
+ * <p>
+ * TODO: Also need a synchronous peer state change notification.
+ */
+@InterfaceAudience.Private
+public interface PeerActionListener {
+
+  default void peerRemoved(String peerId) {}
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/e339f3b4/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SynchronousReplicationPeerProvider.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SynchronousReplicationPeerProvider.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SynchronousReplicationPeerProvider.java
new file mode 100644
index 0000000..b4e04fb
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SynchronousReplicationPeerProvider.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import java.util.Optional;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Get the peer id and remote root dir if the region is synchronously 
replicated.
+ */
+@InterfaceAudience.Private
+public interface SynchronousReplicationPeerProvider {
+
+  /**
+   * Return the peer id and remote WAL directory if the region is 
synchronously replicated.
+   */
+  Optional<Pair<String, String>> getPeerIdAndRemoteWALDir(RegionInfo info);
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/e339f3b4/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
index 231afd5..3eb8f8f 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
@@ -137,6 +137,7 @@ public abstract class AbstractFSWALProvider<T extends 
AbstractFSWAL<?>> implemen
         if (walCopy == null) {
           walCopy = createWAL();
           wal = walCopy;
+          walCopy.init();
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e339f3b4/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java
index 9c62bed..84e859d 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java
@@ -18,7 +18,6 @@
 package org.apache.hadoop.hbase.wal;
 
 import java.io.IOException;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -31,12 +30,10 @@ import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
 import org.apache.hbase.thirdparty.io.netty.channel.Channel;
 import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
-import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
-import 
org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
-import 
org.apache.hbase.thirdparty.io.netty.util.concurrent.DefaultThreadFactory;
 
 /**
  * A WAL provider that use {@link AsyncFSWAL}.
@@ -61,6 +58,7 @@ public class AsyncFSWALProvider extends 
AbstractFSWALProvider<AsyncFSWAL> {
   private EventLoopGroup eventLoopGroup;
 
   private Class<? extends Channel> channelClass;
+
   @Override
   protected AsyncFSWAL createWAL() throws IOException {
     return new AsyncFSWAL(CommonFSUtils.getWALFileSystem(conf), 
CommonFSUtils.getWALRootDir(conf),
@@ -73,15 +71,9 @@ public class AsyncFSWALProvider extends 
AbstractFSWALProvider<AsyncFSWAL> {
   @Override
   protected void doInit(Configuration conf) throws IOException {
     Pair<EventLoopGroup, Class<? extends Channel>> 
eventLoopGroupAndChannelClass =
-        NettyAsyncFSWALConfigHelper.getEventLoopConfig(conf);
-    if (eventLoopGroupAndChannelClass != null) {
-      eventLoopGroup = eventLoopGroupAndChannelClass.getFirst();
-      channelClass = eventLoopGroupAndChannelClass.getSecond();
-    } else {
-      eventLoopGroup = new NioEventLoopGroup(1,
-          new DefaultThreadFactory("AsyncFSWAL", true, Thread.MAX_PRIORITY));
-      channelClass = NioSocketChannel.class;
-    }
+      NettyAsyncFSWALConfigHelper.getEventLoopConfig(conf);
+    eventLoopGroup = eventLoopGroupAndChannelClass.getFirst();
+    channelClass = eventLoopGroupAndChannelClass.getSecond();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/e339f3b4/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/NettyAsyncFSWALConfigHelper.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/NettyAsyncFSWALConfigHelper.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/NettyAsyncFSWALConfigHelper.java
index 0836b5d..00ccb71 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/NettyAsyncFSWALConfigHelper.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/NettyAsyncFSWALConfigHelper.java
@@ -26,6 +26,9 @@ import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
 import org.apache.hbase.thirdparty.io.netty.channel.Channel;
 import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
+import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
+import 
org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
+import 
org.apache.hbase.thirdparty.io.netty.util.concurrent.DefaultThreadFactory;
 
 /**
  * Helper class for passing netty event loop config to {@link 
AsyncFSWALProvider}.
@@ -55,7 +58,10 @@ public class NettyAsyncFSWALConfigHelper {
   static Pair<EventLoopGroup, Class<? extends Channel>> 
getEventLoopConfig(Configuration conf) {
     String name = conf.get(EVENT_LOOP_CONFIG);
     if (StringUtils.isBlank(name)) {
-      return null;
+      // create new event loop group if config is empty
+      return Pair.<EventLoopGroup, Class<? extends Channel>> newPair(
+        new NioEventLoopGroup(0, new DefaultThreadFactory("AsyncFSWAL", true, 
Thread.MAX_PRIORITY)),
+        NioSocketChannel.class);
     }
     return EVENT_LOOP_CONFIG_MAP.get(name);
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e339f3b4/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java
index 28817e9..0b7b8da 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java
@@ -31,6 +31,7 @@ import java.util.stream.Collectors;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
 // imports for classes still in regionserver.wal
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -132,6 +133,7 @@ public class RegionGroupingProvider implements WALProvider {
 
   private RegionGroupingStrategy strategy;
   private WALFactory factory;
+  private Configuration conf;
   private List<WALActionsListener> listeners = new ArrayList<>();
   private String providerId;
   private Class<? extends WALProvider> providerClass;
@@ -141,6 +143,7 @@ public class RegionGroupingProvider implements WALProvider {
     if (null != strategy) {
       throw new IllegalStateException("WALProvider.init should only be called 
once.");
     }
+    this.conf = conf;
     this.factory = factory;
     StringBuilder sb = new StringBuilder().append(factory.factoryId);
     if (providerId != null) {
@@ -156,11 +159,11 @@ public class RegionGroupingProvider implements 
WALProvider {
   }
 
   private WALProvider createProvider(String group) throws IOException {
-    if (META_WAL_PROVIDER_ID.equals(providerId)) {
-      return factory.createProvider(providerClass, META_WAL_PROVIDER_ID);
-    } else {
-      return factory.createProvider(providerClass, group);
-    }
+    WALProvider provider = WALFactory.createProvider(providerClass);
+    provider.init(factory, conf,
+      META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : group);
+    provider.addWALActionsListener(new MetricsWAL());
+    return provider;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/e339f3b4/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SynchronousReplicationWALProvider.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SynchronousReplicationWALProvider.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SynchronousReplicationWALProvider.java
new file mode 100644
index 0000000..f60599f
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SynchronousReplicationWALProvider.java
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.wal;
+
+import static 
org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER;
+import static 
org.apache.hadoop.hbase.wal.AbstractFSWALProvider.getWALArchiveDirectoryName;
+import static 
org.apache.hadoop.hbase.wal.AbstractFSWALProvider.getWALDirectoryName;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.regionserver.wal.DualAsyncFSWAL;
+import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.replication.regionserver.PeerActionListener;
+import 
org.apache.hadoop.hbase.replication.regionserver.SynchronousReplicationPeerProvider;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.util.KeyLocker;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.Streams;
+import org.apache.hbase.thirdparty.io.netty.channel.Channel;
+import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
+
+/**
+ * The special {@link WALProvider} for synchronous replication.
+ * <p>
+ * It works like an interceptor, when getting WAL, first it will check if the 
given region should be
+ * replicated synchronously, if so it will return a special WAL for it, 
otherwise it will delegate
+ * the request to the normal {@link WALProvider}.
+ */
+@InterfaceAudience.Private
+public class SynchronousReplicationWALProvider implements WALProvider, 
PeerActionListener {
+
+  private static final Logger LOG =
+    LoggerFactory.getLogger(SynchronousReplicationWALProvider.class);
+
+  private static final String LOG_SUFFIX = ".syncrep";
+
+  private final WALProvider provider;
+
+  private final SynchronousReplicationPeerProvider peerProvider;
+
+  private WALFactory factory;
+
+  private Configuration conf;
+
+  private List<WALActionsListener> listeners = new ArrayList<>();
+
+  private EventLoopGroup eventLoopGroup;
+
+  private Class<? extends Channel> channelClass;
+
+  private AtomicBoolean initialized = new AtomicBoolean(false);
+
+  private final ConcurrentMap<String, DualAsyncFSWAL> peerId2WAL = new 
ConcurrentHashMap<>();
+
+  private final KeyLocker<String> createLock = new KeyLocker<>();
+
+  SynchronousReplicationWALProvider(WALProvider provider,
+      SynchronousReplicationPeerProvider peerProvider) {
+    this.provider = provider;
+    this.peerProvider = peerProvider;
+  }
+
+  @Override
+  public void init(WALFactory factory, Configuration conf, String providerId) 
throws IOException {
+    if (!initialized.compareAndSet(false, true)) {
+      throw new IllegalStateException("WALProvider.init should only be called 
once.");
+    }
+    provider.init(factory, conf, providerId);
+    this.conf = conf;
+    this.factory = factory;
+    Pair<EventLoopGroup, Class<? extends Channel>> 
eventLoopGroupAndChannelClass =
+      NettyAsyncFSWALConfigHelper.getEventLoopConfig(conf);
+    eventLoopGroup = eventLoopGroupAndChannelClass.getFirst();
+    channelClass = eventLoopGroupAndChannelClass.getSecond();
+  }
+
+  private String getLogPrefix(String peerId) {
+    return factory.factoryId + WAL_FILE_NAME_DELIMITER + peerId;
+  }
+
+  private DualAsyncFSWAL createWAL(String peerId, String remoteWALDir) throws 
IOException {
+    Path remoteWALDirPath = new Path(remoteWALDir);
+    FileSystem remoteFs = remoteWALDirPath.getFileSystem(conf);
+    return new DualAsyncFSWAL(CommonFSUtils.getWALFileSystem(conf), remoteFs,
+        CommonFSUtils.getWALRootDir(conf), new Path(remoteWALDirPath, peerId),
+        getWALDirectoryName(factory.factoryId), 
getWALArchiveDirectoryName(conf, factory.factoryId),
+        conf, listeners, true, getLogPrefix(peerId), LOG_SUFFIX, 
eventLoopGroup, channelClass);
+  }
+
+  private DualAsyncFSWAL getWAL(String peerId, String remoteWALDir) throws 
IOException {
+    DualAsyncFSWAL wal = peerId2WAL.get(peerId);
+    if (wal != null) {
+      return wal;
+    }
+    Lock lock = createLock.acquireLock(peerId);
+    try {
+      wal = peerId2WAL.get(peerId);
+      if (wal == null) {
+        wal = createWAL(peerId, remoteWALDir);
+        peerId2WAL.put(peerId, wal);
+        wal.init();
+      }
+      return wal;
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  @Override
+  public WAL getWAL(RegionInfo region) throws IOException {
+    Optional<Pair<String, String>> peerIdAndRemoteWALDir =
+      peerProvider.getPeerIdAndRemoteWALDir(region);
+    if (peerIdAndRemoteWALDir.isPresent()) {
+      Pair<String, String> pair = peerIdAndRemoteWALDir.get();
+      return getWAL(pair.getFirst(), pair.getSecond());
+    } else {
+      return provider.getWAL(region);
+    }
+  }
+
+  private Stream<WAL> getWALStream() {
+    return Streams.concat(peerId2WAL.values().stream(), 
provider.getWALs().stream());
+  }
+
+  @Override
+  public List<WAL> getWALs() {
+    return getWALStream().collect(Collectors.toList());
+  }
+
+  @Override
+  public void shutdown() throws IOException {
+    // save the last exception and rethrow
+    IOException failure = null;
+    for (DualAsyncFSWAL wal : peerId2WAL.values()) {
+      try {
+        wal.shutdown();
+      } catch (IOException e) {
+        LOG.error("Shutdown WAL failed", e);
+        failure = e;
+      }
+    }
+    provider.shutdown();
+    if (failure != null) {
+      throw failure;
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    // save the last exception and rethrow
+    IOException failure = null;
+    for (DualAsyncFSWAL wal : peerId2WAL.values()) {
+      try {
+        wal.close();
+      } catch (IOException e) {
+        LOG.error("Close WAL failed", e);
+        failure = e;
+      }
+    }
+    provider.close();
+    if (failure != null) {
+      throw failure;
+    }
+  }
+
+  @Override
+  public long getNumLogFiles() {
+    return peerId2WAL.size() + provider.getNumLogFiles();
+  }
+
+  @Override
+  public long getLogFileSize() {
+    return 
peerId2WAL.values().stream().mapToLong(DualAsyncFSWAL::getLogFileSize).sum() +
+      provider.getLogFileSize();
+  }
+
+  @Override
+  public void peerRemoved(String peerId) {
+    WAL wal = peerId2WAL.remove(peerId);
+    if (wal != null) {
+      try {
+        wal.close();
+      } catch (IOException e) {
+        LOG.error("Close WAL failed", e);
+      }
+    }
+  }
+
+  @Override
+  public void addWALActionsListener(WALActionsListener listener) {
+    listeners.add(listener);
+    provider.addWALActionsListener(listener);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/e339f3b4/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
index 1410b53..4e519ee 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
 import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
+import 
org.apache.hadoop.hbase.replication.regionserver.SynchronousReplicationPeerProvider;
 import org.apache.hadoop.hbase.util.CancelableProgressable;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
@@ -130,13 +131,10 @@ public class WALFactory {
     }
   }
 
-  WALProvider createProvider(Class<? extends WALProvider> clazz, String 
providerId)
-      throws IOException {
-    LOG.info("Instantiating WALProvider of type " + clazz);
+  static WALProvider createProvider(Class<? extends WALProvider> clazz) throws 
IOException {
+    LOG.info("Instantiating WALProvider of type {}", clazz);
     try {
-      final WALProvider result = clazz.getDeclaredConstructor().newInstance();
-      result.init(this, conf, providerId);
-      return result;
+      return clazz.newInstance();
     } catch (Exception e) {
       LOG.error("couldn't set up WALProvider, the configured class is " + 
clazz);
       LOG.debug("Exception details for failure to load WALProvider.", e);
@@ -148,9 +146,10 @@ public class WALFactory {
    * instantiate a provider from a config property. requires conf to have 
already been set (as well
    * as anything the provider might need to read).
    */
-  WALProvider getProvider(String key, String defaultValue, String providerId) 
throws IOException {
-    Class<? extends WALProvider> clazz = getProviderClass(key, defaultValue);
-    WALProvider provider = createProvider(clazz, providerId);
+  private WALProvider getProvider(String key, String defaultValue, String 
providerId)
+      throws IOException {
+    WALProvider provider = createProvider(getProviderClass(key, defaultValue));
+    provider.init(this, conf, providerId);
     provider.addWALActionsListener(new MetricsWAL());
     return provider;
   }
@@ -182,6 +181,26 @@ public class WALFactory {
   }
 
   /**
+   * A temporary constructor for testing synchronous replication.
+   * <p>
+   * Remove it once we can integrate the synchronous replication logic in RS.
+   */
+  @VisibleForTesting
+  WALFactory(Configuration conf, String factoryId, 
SynchronousReplicationPeerProvider peerProvider)
+      throws IOException {
+    timeoutMillis = conf.getInt("hbase.hlog.open.timeout", 300000);
+    /* TODO Both of these are probably specific to the fs wal provider */
+    logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl", 
ProtobufLogReader.class,
+      AbstractFSWALProvider.Reader.class);
+    this.conf = conf;
+    this.factoryId = factoryId;
+    WALProvider provider = createProvider(getProviderClass(WAL_PROVIDER, 
DEFAULT_WAL_PROVIDER));
+    this.provider = new SynchronousReplicationWALProvider(provider, 
peerProvider);
+    this.provider.addWALActionsListener(new MetricsWAL());
+    this.provider.init(this, conf, null);
+  }
+
+  /**
    * Shutdown all WALs and clean up any underlying storage.
    * Use only when you will not need to replay and edits that have gone to any 
wals from this
    * factory.

http://git-wip-us.apache.org/repos/asf/hbase/blob/e339f3b4/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKeyImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKeyImpl.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKeyImpl.java
index c1a77ee..4d1ed91 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKeyImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKeyImpl.java
@@ -130,13 +130,21 @@ public class WALKeyImpl implements WALKey {
   }
 
   @VisibleForTesting
-  public WALKeyImpl(final byte[] encodedRegionName, final TableName tablename,
-                long logSeqNum,
+  public WALKeyImpl(final byte[] encodedRegionName, final TableName tablename, 
long logSeqNum,
       final long now, UUID clusterId) {
     List<UUID> clusterIds = new ArrayList<>(1);
     clusterIds.add(clusterId);
-    init(encodedRegionName, tablename, logSeqNum, now, clusterIds,
-        HConstants.NO_NONCE, HConstants.NO_NONCE, null, null);
+    init(encodedRegionName, tablename, logSeqNum, now, clusterIds, 
HConstants.NO_NONCE,
+      HConstants.NO_NONCE, null, null);
+  }
+
+  @VisibleForTesting
+  public WALKeyImpl(final byte[] encodedRegionName, final TableName tablename, 
long logSeqNum,
+      final long now, UUID clusterId, MultiVersionConcurrencyControl mvcc) {
+    List<UUID> clusterIds = new ArrayList<>(1);
+    clusterIds.add(clusterId);
+    init(encodedRegionName, tablename, logSeqNum, now, clusterIds, 
HConstants.NO_NONCE,
+      HConstants.NO_NONCE, mvcc, null);
   }
 
   // TODO: Fix being able to pass in sequenceid.

http://git-wip-us.apache.org/repos/asf/hbase/blob/e339f3b4/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionPolicy.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionPolicy.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionPolicy.java
index ca4b227..939f35c 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionPolicy.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionPolicy.java
@@ -100,6 +100,7 @@ public class TestCompactionPolicy {
     HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
 
     hlog = new FSHLog(fs, basedir, logName, conf);
+    hlog.init();
     ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 
0, null);
     region = HRegion.createHRegion(info, basedir, conf, htd, hlog);
     region.close();

http://git-wip-us.apache.org/repos/asf/hbase/blob/e339f3b4/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java
index 7cb3e63..8aa282e 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java
@@ -102,65 +102,64 @@ public class TestFailedAppendAndSync {
     return name.getMethodName();
   }
 
-  /**
-   * Reproduce locking up that happens when we get an exceptions appending and 
syncing.
-   * See HBASE-14317.
-   * First I need to set up some mocks for Server and RegionServerServices. I 
also need to
-   * set up a dodgy WAL that will throw an exception when we go to append to 
it.
-   */
-  @Test
-  public void testLockupAroundBadAssignSync() throws IOException {
+  // Dodgy WAL. Will throw exceptions when flags set.
+  class DodgyFSLog extends FSHLog {
+    volatile boolean throwSyncException = false;
+    volatile boolean throwAppendException = false;
     final AtomicLong rolls = new AtomicLong(0);
-    // Dodgy WAL. Will throw exceptions when flags set.
-    class DodgyFSLog extends FSHLog {
-      volatile boolean throwSyncException = false;
-      volatile boolean throwAppendException = false;
 
-      public DodgyFSLog(FileSystem fs, Path root, String logDir, Configuration 
conf)
-      throws IOException {
-        super(fs, root, logDir, conf);
-      }
-
-      @Override
-      public byte[][] rollWriter(boolean force) throws 
FailedLogCloseException, IOException {
-        byte [][] regions = super.rollWriter(force);
-        rolls.getAndIncrement();
-        return regions;
-      }
+    public DodgyFSLog(FileSystem fs, Path root, String logDir, Configuration 
conf)
+        throws IOException {
+      super(fs, root, logDir, conf);
+    }
 
-      @Override
-      protected Writer createWriterInstance(Path path) throws IOException {
-        final Writer w = super.createWriterInstance(path);
-          return new Writer() {
-            @Override
-            public void close() throws IOException {
-              w.close();
-            }
+    @Override
+    public byte[][] rollWriter(boolean force) throws FailedLogCloseException, 
IOException {
+      byte[][] regions = super.rollWriter(force);
+      rolls.getAndIncrement();
+      return regions;
+    }
 
-            @Override
-            public void sync() throws IOException {
-              if (throwSyncException) {
-                throw new IOException("FAKE! Failed to replace a bad 
datanode...");
-              }
-              w.sync();
-            }
+    @Override
+    protected Writer createWriterInstance(Path path) throws IOException {
+      final Writer w = super.createWriterInstance(path);
+      return new Writer() {
+        @Override
+        public void close() throws IOException {
+          w.close();
+        }
 
-            @Override
-            public void append(Entry entry) throws IOException {
-              if (throwAppendException) {
-                throw new IOException("FAKE! Failed to replace a bad 
datanode...");
-              }
-              w.append(entry);
-            }
+        @Override
+        public void sync() throws IOException {
+          if (throwSyncException) {
+            throw new IOException("FAKE! Failed to replace a bad datanode...");
+          }
+          w.sync();
+        }
 
-            @Override
-            public long getLength() {
-              return w.getLength();
-              }
-            };
+        @Override
+        public void append(Entry entry) throws IOException {
+          if (throwAppendException) {
+            throw new IOException("FAKE! Failed to replace a bad datanode...");
           }
-      }
+          w.append(entry);
+        }
 
+        @Override
+        public long getLength() {
+          return w.getLength();
+        }
+      };
+    }
+  }
+  /**
+   * Reproduce locking up that happens when we get an exceptions appending and 
syncing.
+   * See HBASE-14317.
+   * First I need to set up some mocks for Server and RegionServerServices. I 
also need to
+   * set up a dodgy WAL that will throw an exception when we go to append to 
it.
+   */
+  @Test
+  public void testLockupAroundBadAssignSync() throws IOException {
     // Make up mocked server and services.
     Server server = mock(Server.class);
     when(server.getConfiguration()).thenReturn(CONF);
@@ -172,6 +171,7 @@ public class TestFailedAppendAndSync {
     FileSystem fs = FileSystem.get(CONF);
     Path rootDir = new Path(dir + getName());
     DodgyFSLog dodgyWAL = new DodgyFSLog(fs, rootDir, getName(), CONF);
+    dodgyWAL.init();
     LogRoller logRoller = new LogRoller(server, services);
     logRoller.addWAL(dodgyWAL);
     logRoller.start();
@@ -192,7 +192,7 @@ public class TestFailedAppendAndSync {
       } catch (IOException ioe) {
         fail();
       }
-      long rollsCount = rolls.get();
+      long rollsCount = dodgyWAL.rolls.get();
       try {
         dodgyWAL.throwAppendException = true;
         dodgyWAL.throwSyncException = false;
@@ -202,8 +202,10 @@ public class TestFailedAppendAndSync {
       } catch (IOException ioe) {
         threwOnAppend = true;
       }
-      while (rollsCount == rolls.get()) Threads.sleep(100);
-      rollsCount = rolls.get();
+      while (rollsCount == dodgyWAL.rolls.get()) {
+        Threads.sleep(100);
+      }
+      rollsCount = dodgyWAL.rolls.get();
 
       // When we get to here.. we should be ok. A new WAL has been put in 
place. There were no
       // appends to sync. We should be able to continue.
@@ -217,14 +219,16 @@ public class TestFailedAppendAndSync {
       } catch (IOException ioe) {
         threwOnBoth = true;
       }
-      while (rollsCount == rolls.get()) Threads.sleep(100);
+      while (rollsCount == dodgyWAL.rolls.get()) {
+        Threads.sleep(100);
+      }
 
       // Again, all should be good. New WAL and no outstanding unsync'd edits 
so we should be able
       // to just continue.
 
       // So, should be no abort at this stage. Verify.
-      Mockito.verify(server, Mockito.atLeast(0)).
-        abort(Mockito.anyString(), (Throwable)Mockito.anyObject());
+      Mockito.verify(server, Mockito.atLeast(0)).abort(Mockito.anyString(),
+        Mockito.any(Throwable.class));
       try {
         dodgyWAL.throwAppendException = false;
         dodgyWAL.throwSyncException = true;
@@ -239,8 +243,8 @@ public class TestFailedAppendAndSync {
       // happens. If it don't we'll timeout the whole test. That is fine.
       while (true) {
         try {
-          Mockito.verify(server, Mockito.atLeast(1)).
-            abort(Mockito.anyString(), (Throwable)Mockito.anyObject());
+          Mockito.verify(server, Mockito.atLeast(1)).abort(Mockito.anyString(),
+            Mockito.any(Throwable.class));
           break;
         } catch (WantedButNotInvoked t) {
           Threads.sleep(1);

http://git-wip-us.apache.org/repos/asf/hbase/blob/e339f3b4/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index 88e1aa2..df24e0a 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -217,7 +217,6 @@ public class TestHRegion {
   protected static HBaseTestingUtility TEST_UTIL;
   public static Configuration CONF ;
   private String dir;
-  private static FileSystem FILESYSTEM;
   private final int MAX_VERSIONS = 2;
 
   // Test names
@@ -239,7 +238,6 @@ public class TestHRegion {
   @Before
   public void setup() throws IOException {
     TEST_UTIL = HBaseTestingUtility.createLocalHTU();
-    FILESYSTEM = TEST_UTIL.getTestFileSystem();
     CONF = TEST_UTIL.getConfiguration();
     dir = TEST_UTIL.getDataTestDir("TestHRegion").toString();
     method = name.getMethodName();
@@ -341,6 +339,7 @@ public class TestHRegion {
     FileSystem fs = FileSystem.get(CONF);
     Path rootDir = new Path(dir + "testMemstoreSnapshotSize");
     MyFaultyFSLog faultyLog = new MyFaultyFSLog(fs, rootDir, 
"testMemstoreSnapshotSize", CONF);
+    faultyLog.init();
     HRegion region = initHRegion(tableName, null, null, false, 
Durability.SYNC_WAL, faultyLog,
         COLUMN_FAMILY_BYTES);
 
@@ -352,7 +351,6 @@ public class TestHRegion {
     Put put = new Put(value);
     put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("abc"), value);
     faultyLog.setFailureType(FaultyFSLog.FailureType.SYNC);
-
     boolean threwIOE = false;
     try {
       region.put(put);
@@ -388,6 +386,7 @@ public class TestHRegion {
     FileSystem fs = FileSystem.get(CONF);
     Path rootDir = new Path(dir + testName);
     FSHLog hLog = new FSHLog(fs, rootDir, testName, CONF);
+    hLog.init();
     HRegion region = initHRegion(tableName, null, null, false, 
Durability.SYNC_WAL, hLog,
         COLUMN_FAMILY_BYTES);
     HStore store = region.getStore(COLUMN_FAMILY_BYTES);
@@ -1162,6 +1161,7 @@ public class TestHRegion {
     FailAppendFlushMarkerWAL wal =
       new FailAppendFlushMarkerWAL(FileSystem.get(walConf), 
FSUtils.getRootDir(walConf),
         method, walConf);
+    wal.init();
     this.region = initHRegion(tableName, HConstants.EMPTY_START_ROW,
       HConstants.EMPTY_END_ROW, false, Durability.USE_DEFAULT, wal, family);
     try {
@@ -1193,7 +1193,7 @@ public class TestHRegion {
       wal.flushActions = new FlushAction [] {FlushAction.COMMIT_FLUSH};
       wal = new FailAppendFlushMarkerWAL(FileSystem.get(walConf), 
FSUtils.getRootDir(walConf),
             method, walConf);
-
+      wal.init();
       this.region = initHRegion(tableName, HConstants.EMPTY_START_ROW,
         HConstants.EMPTY_END_ROW, false, Durability.USE_DEFAULT, wal, family);
       region.put(put);
@@ -2445,6 +2445,7 @@ public class TestHRegion {
     FileSystem fs = FileSystem.get(CONF);
     Path rootDir = new Path(dir + "testDataInMemoryWithoutWAL");
     FSHLog hLog = new FSHLog(fs, rootDir, "testDataInMemoryWithoutWAL", CONF);
+    hLog.init();
     // This chunk creation is done throughout the code base. Do we want to 
move it into core?
     // It is missing from this test. W/o it we NPE.
     ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 
0, null);
@@ -2497,9 +2498,9 @@ public class TestHRegion {
     RegionCoprocessorHost mockedCPHost = 
Mockito.mock(RegionCoprocessorHost.class);
     // Because the preBatchMutate returns void, we can't do usual Mockito 
when...then form. Must
     // do below format (from Mockito doc).
-    Mockito.doAnswer(new Answer() {
+    Mockito.doAnswer(new Answer<Void>() {
       @Override
-      public Object answer(InvocationOnMock invocation) throws Throwable {
+      public Void answer(InvocationOnMock invocation) throws Throwable {
         MiniBatchOperationInProgress<Mutation> mb = invocation.getArgument(0);
         mb.addOperationsFromCP(0, new Mutation[]{addPut});
         return null;
@@ -3793,9 +3794,12 @@ public class TestHRegion {
 
         boolean previousEmpty = res.isEmpty();
         res.clear();
-        InternalScanner scanner = region.getScanner(scan);
-        while (scanner.next(res))
-          ;
+        try (InternalScanner scanner = region.getScanner(scan)) {
+          boolean moreRows;
+          do {
+            moreRows = scanner.next(res);
+          } while (moreRows);
+        }
         if (!res.isEmpty() || !previousEmpty || i > compactInterval) {
           assertEquals("i=" + i, expectedCount, res.size());
           long timestamp = res.get(0).getTimestamp();
@@ -3891,7 +3895,7 @@ public class TestHRegion {
             region.put(put);
             numPutsFinished++;
             if (numPutsFinished > 0 && numPutsFinished % 47 == 0) {
-              System.out.println("put iteration = " + numPutsFinished);
+              LOG.debug("put iteration = {}", numPutsFinished);
               Delete delete = new Delete(row, (long) numPutsFinished - 30);
               region.delete(delete);
             }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e339f3b4/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionWithInMemoryFlush.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionWithInMemoryFlush.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionWithInMemoryFlush.java
index dfe52d0..58f62e3 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionWithInMemoryFlush.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionWithInMemoryFlush.java
@@ -27,25 +27,18 @@ import 
org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.junit.ClassRule;
 import org.junit.experimental.categories.Category;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * A test similar to TestHRegion, but with in-memory flush families.
  * Also checks wal truncation after in-memory compaction.
  */
 @Category({VerySlowRegionServerTests.class, LargeTests.class})
-@SuppressWarnings("deprecation")
 public class TestHRegionWithInMemoryFlush extends TestHRegion{
 
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
       HBaseClassTestRule.forClass(TestHRegionWithInMemoryFlush.class);
 
-  // Do not spin up clusters in here. If you need to spin up a cluster, do it
-  // over in TestHRegionOnCluster.
-  private static final Logger LOG = 
LoggerFactory.getLogger(TestHRegionWithInMemoryFlush.class);
-
   /**
    * @return A region on which you must call
    *         {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done.

http://git-wip-us.apache.org/repos/asf/hbase/blob/e339f3b4/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionIncrement.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionIncrement.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionIncrement.java
index 8b96fa7..e5006ea 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionIncrement.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionIncrement.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Increment;
 import org.apache.hadoop.hbase.client.Scan;
@@ -36,7 +37,6 @@ import 
org.apache.hadoop.hbase.client.TestIncrementsFromClientSide;
 import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.wal.WAL;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.ClassRule;
@@ -81,12 +81,12 @@ public class TestRegionIncrement {
   }
 
   private HRegion getRegion(final Configuration conf, final String tableName) 
throws IOException {
-    WAL wal = new FSHLog(FileSystem.get(conf), TEST_UTIL.getDataTestDir(),
-      TEST_UTIL.getDataTestDir().toString(), conf);
+    FSHLog wal = new FSHLog(FileSystem.get(conf), TEST_UTIL.getDataTestDir(),
+        TEST_UTIL.getDataTestDir().toString(), conf);
+    wal.init();
     ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 
0, null);
-    return (HRegion)TEST_UTIL.createLocalHRegion(Bytes.toBytes(tableName),
-      HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, tableName, 
conf,
-      false, Durability.SKIP_WAL, wal, INCREMENT_BYTES);
+    return TEST_UTIL.createLocalHRegion(TableName.valueOf(tableName), 
HConstants.EMPTY_BYTE_ARRAY,
+      HConstants.EMPTY_BYTE_ARRAY, false, Durability.SKIP_WAL, wal, 
INCREMENT_BYTES);
   }
 
   private void closeRegion(final HRegion region) throws IOException {
@@ -170,8 +170,6 @@ public class TestRegionIncrement {
 
   /**
    * Have each thread update its own Cell. Avoid contention with another 
thread.
-   * @throws IOException
-   * @throws InterruptedException
    */
   @Test
   public void testUnContendedSingleCellIncrement()
@@ -209,13 +207,9 @@ public class TestRegionIncrement {
 
   /**
    * Have each thread update its own Cell. Avoid contention with another 
thread.
-   * This is
-   * @throws IOException
-   * @throws InterruptedException
    */
   @Test
-  public void testContendedAcrossCellsIncrement()
-  throws IOException, InterruptedException {
+  public void testContendedAcrossCellsIncrement() throws IOException, 
InterruptedException {
     final HRegion region = getRegion(TEST_UTIL.getConfiguration(),
         
TestIncrementsFromClientSide.filterStringSoTableNameSafe(this.name.getMethodName()));
     long startTime = System.currentTimeMillis();

http://git-wip-us.apache.org/repos/asf/hbase/blob/e339f3b4/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java
index ca65914..8913343 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java
@@ -214,6 +214,7 @@ public class TestWALLockup {
     FileSystem fs = FileSystem.get(CONF);
     Path rootDir = new Path(dir + getName());
     DodgyFSLog dodgyWAL = new DodgyFSLog(fs, rootDir, getName(), CONF);
+    dodgyWAL.init();
     Path originalWAL = dodgyWAL.getCurrentFileName();
     // I need a log roller running.
     LogRoller logRoller = new LogRoller(server, services);

http://git-wip-us.apache.org/repos/asf/hbase/blob/e339f3b4/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java
index b1e304e..7600fe9 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java
@@ -1097,6 +1097,7 @@ public abstract class AbstractTestWALReplay {
 
   private MockWAL createMockWAL() throws IOException {
     MockWAL wal = new MockWAL(fs, hbaseRootDir, logName, conf);
+    wal.init();
     // Set down maximum recovery so we dfsclient doesn't linger retrying 
something
     // long gone.
     HBaseTestingUtility.setMaxRecoveryErrorCount(wal.getOutputStream(), 1);

http://git-wip-us.apache.org/repos/asf/hbase/blob/e339f3b4/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogTestHelper.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogTestHelper.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogTestHelper.java
index ecd8e6c..49633cb 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogTestHelper.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogTestHelper.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WALEdit;
@@ -41,7 +42,7 @@ import org.apache.hadoop.hbase.wal.WALProvider;
 /**
  * Helper class for testing protobuf log.
  */
-final class ProtobufLogTestHelper {
+public final class ProtobufLogTestHelper {
 
   private ProtobufLogTestHelper() {
   }
@@ -54,17 +55,22 @@ final class ProtobufLogTestHelper {
     return RegionInfoBuilder.newBuilder(tableName).setRegionId(1024).build();
   }
 
+  private static WAL.Entry generateEdit(int i, RegionInfo hri, TableName 
tableName, byte[] row,
+      int columnCount, long timestamp, MultiVersionConcurrencyControl mvcc) {
+    WALKeyImpl key = new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, i, 
timestamp,
+        HConstants.DEFAULT_CLUSTER_ID, mvcc);
+    WALEdit edit = new WALEdit();
+    int prefix = i;
+    IntStream.range(0, columnCount).mapToObj(j -> toValue(prefix, j))
+        .map(value -> new KeyValue(row, row, row, timestamp, 
value)).forEachOrdered(edit::add);
+    return new WAL.Entry(key, edit);
+  }
+
   public static void doWrite(WALProvider.Writer writer, boolean withTrailer, 
TableName tableName,
       int columnCount, int recordCount, byte[] row, long timestamp) throws 
IOException {
     RegionInfo hri = toRegionInfo(tableName);
     for (int i = 0; i < recordCount; i++) {
-      WALKeyImpl key = new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, 
i, timestamp,
-          HConstants.DEFAULT_CLUSTER_ID);
-      WALEdit edit = new WALEdit();
-      int prefix = i;
-      IntStream.range(0, columnCount).mapToObj(j -> toValue(prefix, j))
-          .map(value -> new KeyValue(row, row, row, timestamp, 
value)).forEachOrdered(edit::add);
-      writer.append(new WAL.Entry(key, edit));
+      writer.append(generateEdit(i, hri, tableName, row, columnCount, 
timestamp, null));
     }
     writer.sync();
     if (withTrailer) {
@@ -72,14 +78,24 @@ final class ProtobufLogTestHelper {
     }
   }
 
-  public static void doRead(ProtobufLogReader reader, boolean withTrailer, 
TableName tableName,
-      int columnCount, int recordCount, byte[] row, long timestamp) throws 
IOException {
+  public static void doWrite(WAL wal, RegionInfo hri, TableName tableName, int 
columnCount,
+      int recordCount, byte[] row, long timestamp, 
MultiVersionConcurrencyControl mvcc)
+      throws IOException {
+    for (int i = 0; i < recordCount; i++) {
+      WAL.Entry entry = generateEdit(i, hri, tableName, row, columnCount, 
timestamp, mvcc);
+      wal.append(hri, entry.getKey(), entry.getEdit(), true);
+    }
+    wal.sync();
+  }
+
+  public static void doRead(ProtobufLogReader reader, boolean withTrailer, 
RegionInfo hri,
+      TableName tableName, int columnCount, int recordCount, byte[] row, long 
timestamp)
+      throws IOException {
     if (withTrailer) {
       assertNotNull(reader.trailer);
     } else {
       assertNull(reader.trailer);
     }
-    RegionInfo hri = toRegionInfo(tableName);
     for (int i = 0; i < recordCount; ++i) {
       WAL.Entry entry = reader.next();
       assertNotNull(entry);
@@ -96,4 +112,10 @@ final class ProtobufLogTestHelper {
     }
     assertNull(reader.next());
   }
+
+  public static void doRead(ProtobufLogReader reader, boolean withTrailer, 
TableName tableName,
+      int columnCount, int recordCount, byte[] row, long timestamp) throws 
IOException {
+    doRead(reader, withTrailer, toRegionInfo(tableName), tableName, 
columnCount, recordCount, row,
+      timestamp);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e339f3b4/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java
index 450c01b..5f0f77c 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java
@@ -67,8 +67,10 @@ public class TestAsyncFSWAL extends AbstractTestFSWAL {
   protected AbstractFSWAL<?> newWAL(FileSystem fs, Path rootDir, String 
logDir, String archiveDir,
       Configuration conf, List<WALActionsListener> listeners, boolean 
failIfWALExists,
       String prefix, String suffix) throws IOException {
-    return new AsyncFSWAL(fs, rootDir, logDir, archiveDir, conf, listeners, 
failIfWALExists, prefix,
-        suffix, GROUP, CHANNEL_CLASS);
+    AsyncFSWAL wal = new AsyncFSWAL(fs, rootDir, logDir, archiveDir, conf, 
listeners,
+        failIfWALExists, prefix, suffix, GROUP, CHANNEL_CLASS);
+    wal.init();
+    return wal;
   }
 
   @Override
@@ -76,15 +78,16 @@ public class TestAsyncFSWAL extends AbstractTestFSWAL {
       String archiveDir, Configuration conf, List<WALActionsListener> 
listeners,
       boolean failIfWALExists, String prefix, String suffix, final Runnable 
action)
       throws IOException {
-    return new AsyncFSWAL(fs, rootDir, logDir, archiveDir, conf, listeners, 
failIfWALExists, prefix,
-        suffix, GROUP, CHANNEL_CLASS) {
+    AsyncFSWAL wal = new AsyncFSWAL(fs, rootDir, logDir, archiveDir, conf, 
listeners,
+        failIfWALExists, prefix, suffix, GROUP, CHANNEL_CLASS) {
 
       @Override
       void atHeadOfRingBufferEventHandlerAppend() {
         action.run();
         super.atHeadOfRingBufferEventHandlerAppend();
       }
-
     };
+    wal.init();
+    return wal;
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e339f3b4/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncWALReplay.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncWALReplay.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncWALReplay.java
index 80b7477..0740954 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncWALReplay.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncWALReplay.java
@@ -66,7 +66,9 @@ public class TestAsyncWALReplay extends AbstractTestWALReplay 
{
 
   @Override
   protected WAL createWAL(Configuration c, Path hbaseRootDir, String logName) 
throws IOException {
-    return new AsyncFSWAL(FileSystem.get(c), hbaseRootDir, logName,
+    AsyncFSWAL wal = new AsyncFSWAL(FileSystem.get(c), hbaseRootDir, logName,
         HConstants.HREGION_OLDLOGDIR_NAME, c, null, true, null, null, GROUP, 
CHANNEL_CLASS);
+    wal.init();
+    return wal;
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e339f3b4/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCombinedAsyncWriter.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCombinedAsyncWriter.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCombinedAsyncWriter.java
index d74f9d8..36dbe0f 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCombinedAsyncWriter.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCombinedAsyncWriter.java
@@ -77,8 +77,7 @@ public class TestCombinedAsyncWriter {
     CHANNEL_CLASS = NioSocketChannel.class;
     UTIL.startMiniDFSCluster(3);
     UTIL.getTestFileSystem().mkdirs(UTIL.getDataTestDirOnTestFS());
-    WALS =
-      new WALFactory(UTIL.getConfiguration(), null, 
TestCombinedAsyncWriter.class.getSimpleName());
+    WALS = new WALFactory(UTIL.getConfiguration(), 
TestCombinedAsyncWriter.class.getSimpleName());
   }
 
   @AfterClass

http://git-wip-us.apache.org/repos/asf/hbase/blob/e339f3b4/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
index 93ea2b8..b0937f7 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
@@ -74,8 +74,10 @@ public class TestFSHLog extends AbstractTestFSWAL {
   protected AbstractFSWAL<?> newWAL(FileSystem fs, Path rootDir, String 
walDir, String archiveDir,
       Configuration conf, List<WALActionsListener> listeners, boolean 
failIfWALExists,
       String prefix, String suffix) throws IOException {
-    return new FSHLog(fs, rootDir, walDir, archiveDir, conf, listeners, 
failIfWALExists, prefix,
-        suffix);
+    FSHLog wal =
+      new FSHLog(fs, rootDir, walDir, archiveDir, conf, listeners, 
failIfWALExists, prefix, suffix);
+    wal.init();
+    return wal;
   }
 
   @Override
@@ -83,8 +85,8 @@ public class TestFSHLog extends AbstractTestFSWAL {
       String archiveDir, Configuration conf, List<WALActionsListener> 
listeners,
       boolean failIfWALExists, String prefix, String suffix, final Runnable 
action)
       throws IOException {
-    return new FSHLog(fs, rootDir, walDir, archiveDir, conf, listeners, 
failIfWALExists, prefix,
-        suffix) {
+    FSHLog wal = new FSHLog(fs, rootDir, walDir, archiveDir, conf, listeners, 
failIfWALExists,
+        prefix, suffix) {
 
       @Override
       void atHeadOfRingBufferEventHandlerAppend() {
@@ -92,6 +94,8 @@ public class TestFSHLog extends AbstractTestFSWAL {
         super.atHeadOfRingBufferEventHandlerAppend();
       }
     };
+    wal.init();
+    return wal;
   }
 
   @Test
@@ -100,6 +104,7 @@ public class TestFSHLog extends AbstractTestFSWAL {
     final String name = this.name.getMethodName();
     FSHLog log = new FSHLog(FS, FSUtils.getRootDir(CONF), name, 
HConstants.HREGION_OLDLOGDIR_NAME,
       CONF, null, true, null, null);
+    log.init();
     try {
       Field ringBufferEventHandlerField = 
FSHLog.class.getDeclaredField("ringBufferEventHandler");
       ringBufferEventHandlerField.setAccessible(true);
@@ -142,7 +147,7 @@ public class TestFSHLog extends AbstractTestFSWAL {
     try (FSHLog log =
         new FSHLog(FS, FSUtils.getRootDir(CONF), name, 
HConstants.HREGION_OLDLOGDIR_NAME, CONF,
             null, true, null, null)) {
-
+      log.init();
       log.registerWALActionsListener(new WALActionsListener() {
         @Override
         public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit)

http://git-wip-us.apache.org/repos/asf/hbase/blob/e339f3b4/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
index 649e981..66e19a8 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
@@ -48,6 +48,7 @@ public class TestWALReplay extends AbstractTestWALReplay {
   @Override
   protected WAL createWAL(Configuration c, Path hbaseRootDir, String logName) 
throws IOException {
     FSHLog wal = new FSHLog(FileSystem.get(c), hbaseRootDir, logName, c);
+    wal.init();
     // Set down maximum recovery so we dfsclient doesn't linger retrying 
something
     // long gone.
     HBaseTestingUtility.setMaxRecoveryErrorCount(wal.getOutputStream(), 1);

http://git-wip-us.apache.org/repos/asf/hbase/blob/e339f3b4/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java
index 3928d9c..f996ce0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java
@@ -104,8 +104,6 @@ public class IOTestProvider implements WALProvider {
     this.factory = factory;
     this.conf = conf;
     this.providerId = providerId != null ? providerId : DEFAULT_PROVIDER_ID;
-
-
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/e339f3b4/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSynchronousReplicationWALProvider.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSynchronousReplicationWALProvider.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSynchronousReplicationWALProvider.java
new file mode 100644
index 0000000..e6031c6
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSynchronousReplicationWALProvider.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.wal;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.not;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+import java.io.IOException;
+import java.util.Optional;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
+import org.apache.hadoop.hbase.regionserver.wal.DualAsyncFSWAL;
+import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
+import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogTestHelper;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ RegionServerTests.class, MediumTests.class })
+public class TestSynchronousReplicationWALProvider {
+
+  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+  private static String PEER_ID = "1";
+
+  private static String REMOTE_WAL_DIR = "/RemoteWAL";
+
+  private static TableName TABLE = TableName.valueOf("table");
+
+  private static TableName TABLE_NO_REP = TableName.valueOf("table-no-rep");
+
+  private static RegionInfo REGION = 
RegionInfoBuilder.newBuilder(TABLE).build();
+
+  private static RegionInfo REGION_NO_REP = 
RegionInfoBuilder.newBuilder(TABLE_NO_REP).build();
+
+  private static WALFactory FACTORY;
+
+  private static Optional<Pair<String, String>> 
getPeerIdAndRemoteWALDir(RegionInfo info) {
+    if (info.getTable().equals(TABLE)) {
+      return Optional.of(Pair.newPair(PEER_ID, REMOTE_WAL_DIR));
+    } else {
+      return Optional.empty();
+    }
+  }
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    UTIL.startMiniDFSCluster(3);
+    FACTORY = new WALFactory(UTIL.getConfiguration(), "test",
+        TestSynchronousReplicationWALProvider::getPeerIdAndRemoteWALDir);
+    UTIL.getTestFileSystem().mkdirs(new Path(REMOTE_WAL_DIR, PEER_ID));
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws IOException {
+    FACTORY.close();
+    UTIL.shutdownMiniDFSCluster();
+  }
+
+  private void testReadWrite(DualAsyncFSWAL wal) throws Exception {
+    int recordCount = 100;
+    int columnCount = 10;
+    byte[] row = Bytes.toBytes("testRow");
+    long timestamp = System.currentTimeMillis();
+    MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
+    ProtobufLogTestHelper.doWrite(wal, REGION, TABLE, columnCount, 
recordCount, row, timestamp,
+      mvcc);
+    Path localFile = wal.getCurrentFileName();
+    Path remoteFile = new Path(REMOTE_WAL_DIR + "/" + PEER_ID, 
localFile.getName());
+    try (ProtobufLogReader reader =
+      (ProtobufLogReader) FACTORY.createReader(UTIL.getTestFileSystem(), 
localFile)) {
+      ProtobufLogTestHelper.doRead(reader, false, REGION, TABLE, columnCount, 
recordCount, row,
+        timestamp);
+    }
+    try (ProtobufLogReader reader =
+      (ProtobufLogReader) FACTORY.createReader(UTIL.getTestFileSystem(), 
remoteFile)) {
+      ProtobufLogTestHelper.doRead(reader, false, REGION, TABLE, columnCount, 
recordCount, row,
+        timestamp);
+    }
+    wal.rollWriter();
+    DistributedFileSystem dfs = (DistributedFileSystem) 
UTIL.getDFSCluster().getFileSystem();
+    UTIL.waitFor(5000, new ExplainingPredicate<Exception>() {
+
+      @Override
+      public boolean evaluate() throws Exception {
+        return dfs.isFileClosed(localFile) && dfs.isFileClosed(remoteFile);
+      }
+
+      @Override
+      public String explainFailure() throws Exception {
+        StringBuilder sb = new StringBuilder();
+        if (!dfs.isFileClosed(localFile)) {
+          sb.append(localFile + " has not been closed yet.");
+        }
+        if (!dfs.isFileClosed(remoteFile)) {
+          sb.append(remoteFile + " has not been closed yet.");
+        }
+        return sb.toString();
+      }
+    });
+    try (ProtobufLogReader reader =
+      (ProtobufLogReader) FACTORY.createReader(UTIL.getTestFileSystem(), 
localFile)) {
+      ProtobufLogTestHelper.doRead(reader, true, REGION, TABLE, columnCount, 
recordCount, row,
+        timestamp);
+    }
+    try (ProtobufLogReader reader =
+      (ProtobufLogReader) FACTORY.createReader(UTIL.getTestFileSystem(), 
remoteFile)) {
+      ProtobufLogTestHelper.doRead(reader, true, REGION, TABLE, columnCount, 
recordCount, row,
+        timestamp);
+    }
+  }
+
+  @Test
+  public void test() throws Exception {
+    WAL walNoRep = FACTORY.getWAL(REGION_NO_REP);
+    assertThat(walNoRep, not(instanceOf(DualAsyncFSWAL.class)));
+    DualAsyncFSWAL wal = (DualAsyncFSWAL) FACTORY.getWAL(REGION);
+    assertEquals(2, FACTORY.getWALs().size());
+    testReadWrite(wal);
+    SynchronousReplicationWALProvider walProvider =
+      (SynchronousReplicationWALProvider) FACTORY.getWALProvider();
+    walProvider.peerRemoved(PEER_ID);
+    assertEquals(1, FACTORY.getWALs().size());
+  }
+}

Reply via email to