This is an automated email from the ASF dual-hosted git repository.

elserj pushed a commit to branch HBASE-20952
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/HBASE-20952 by this push:
     new 5e55f77  HBASE-21020 WAL API changes for replication
5e55f77 is described below

commit 5e55f772c7af9ca971875fa939115776ec41aad0
Author: Ankit Singhal <[email protected]>
AuthorDate: Fri Jan 18 13:36:52 2019 -0500

    HBASE-21020 WAL API changes for replication
    
    Signed-off-by: Josh Elser <[email protected]>
---
 .../hadoop/hbase/regionserver/HRegionServer.java   |   2 +-
 .../hbase/regionserver/ReplicationService.java     |   4 +-
 .../hbase/regionserver/wal/AbstractFSWAL.java      |   7 +-
 .../hadoop/hbase/regionserver/wal/FSHLog.java      |   3 +-
 .../hbase/replication/ReplicationEndpoint.java     |  14 +-
 .../{WALEntryStream.java => FSWALEntryStream.java} | 373 +++++++++++--------
 .../HBaseInterClusterReplicationEndpoint.java      |   2 -
 .../regionserver/RecoveredReplicationSource.java   | 135 ++-----
 .../replication/regionserver/Replication.java      |   8 +-
 .../regionserver/ReplicationSource.java            |  68 +---
 .../regionserver/ReplicationSourceInterface.java   |  24 +-
 .../regionserver/ReplicationSourceManager.java     |  64 +---
 .../regionserver/ReplicationSourceShipper.java     |   8 +-
 .../regionserver/ReplicationSourceWALReader.java   |  41 +--
 .../regionserver/ReplicationSyncUp.java            |  16 +-
 .../regionserver/ReplicationThrottler.java         |   6 +-
 .../SerialReplicationSourceWALReader.java          |   5 +-
 .../replication/regionserver/WALEntryStream.java   | 397 +--------------------
 .../regionserver/WALFileLengthProvider.java        |   5 +-
 .../hadoop/hbase/wal/AbstractFSWALProvider.java    | 112 +++++-
 .../hadoop/hbase/wal/DisabledWALProvider.java      |  93 ++++-
 .../hadoop/hbase/wal/RegionGroupingProvider.java   |  31 ++
 .../hbase/wal/SyncReplicationWALProvider.java      |  25 ++
 .../org/apache/hadoop/hbase/wal/WALProvider.java   |  44 ++-
 .../hbase/replication/ReplicationSourceDummy.java  |  20 +-
 .../regionserver/TestReplicationSource.java        |  10 +-
 .../regionserver/TestReplicationSourceManager.java |  12 +-
 .../regionserver/TestReplicationThrottler.java     |   4 +-
 .../regionserver/TestWALEntryStream.java           |  65 ++--
 .../apache/hadoop/hbase/wal/IOTestProvider.java    |  43 ++-
 .../apache/hadoop/hbase/wal/TestWALFactory.java    |  15 +-
 31 files changed, 738 insertions(+), 918 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 6242d36..7c7b4cc 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -3006,7 +3006,7 @@ public class HRegionServer extends HasThread implements
       throw new IOException("Could not find class for " + classname);
     }
     T service = ReflectionUtils.newInstance(clazz, conf);
-    service.initialize(server, walFs, logDir, oldLogDir, walProvider);
+    service.initialize(server, walProvider);
     return service;
   }
 
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java
index e9bbaea..864736b 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java
@@ -18,8 +18,6 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import java.io.IOException;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
 import org.apache.hadoop.hbase.wal.WALProvider;
@@ -38,7 +36,7 @@ public interface ReplicationService {
    * @param walProvider can be null if not initialized inside a live region 
server environment, for
    *          example, {@code ReplicationSyncUp}.
    */
-  void initialize(Server rs, FileSystem fs, Path logdir, Path oldLogDir, 
WALProvider walProvider)
+  void initialize(Server rs, WALProvider walProvider)
       throws IOException;
 
   /**
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
index ab58b67..5beda79 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
@@ -69,7 +69,6 @@ import org.apache.hadoop.hbase.wal.FSWALIdentity;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.hadoop.hbase.wal.WALFactory;
-import org.apache.hadoop.hbase.wal.WALIdentity;
 import org.apache.hadoop.hbase.wal.WALKeyImpl;
 import org.apache.hadoop.hbase.wal.WALPrettyPrinter;
 import org.apache.hadoop.hbase.wal.WALProvider.WriterBase;
@@ -996,11 +995,11 @@ public abstract class AbstractFSWAL<W extends WriterBase> 
implements WAL {
    * https://issues.apache.org/jira/browse/HBASE-14004 for more details.
    */
   @Override
-  public OptionalLong getLogFileSizeIfBeingWritten(WALIdentity walId) {
+  public OptionalLong getLogFileSizeIfBeingWritten(Path path) {
     rollWriterLock.lock();
     try {
-      FSWALIdentity currentPath = new FSWALIdentity(getOldPath());
-      if (walId.equals(currentPath)) {
+      Path currentPath = getOldPath();
+      if (path.equals(currentPath)) {
         W writer = this.writer;
         return writer != null ? OptionalLong.of(writer.getLength()) : 
OptionalLong.empty();
       } else {
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
index baa87a4..10a07db 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
@@ -215,7 +215,6 @@ public class FSHLog extends AbstractFSWAL<Writer> {
       5);
     this.closeErrorsTolerated = 
conf.getInt("hbase.regionserver.logroll.errors.tolerated", 2);
     this.useHsync = conf.getBoolean(HRegion.WAL_HSYNC_CONF_KEY, 
HRegion.DEFAULT_WAL_HSYNC);
-
     // This is the 'writer' -- a single threaded executor. This single thread 
'consumes' what is
     // put on the ring buffer.
     String hostingThreadName = Thread.currentThread().getName();
@@ -308,7 +307,7 @@ public class FSHLog extends AbstractFSWAL<Writer> {
     SyncFuture syncFuture = null;
     SafePointZigZagLatch zigzagLatch = null;
     long sequence = -1L;
-    if (this.ringBufferEventHandler != null) {
+    if (this.ringBufferEventHandler != null && writer != null) {
       // Get sequence first to avoid dead lock when ring buffer is full
       // Considering below sequence
       // 1. replaceWriter is called and zigzagLatch is initialized
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
index f4c37b1..ad326b8 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
@@ -23,15 +23,13 @@ import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-
-import org.apache.hadoop.hbase.Abortable;
-import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.TableDescriptors;
-import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.yetus.audience.InterfaceAudience;
 
 /**
  * ReplicationEndpoint is a plugin which implements replication
@@ -55,7 +53,6 @@ public interface ReplicationEndpoint extends 
ReplicationPeerConfigListener {
   class Context {
     private final Configuration localConf;
     private final Configuration conf;
-    private final FileSystem fs;
     private final TableDescriptors tableDescriptors;
     private final ReplicationPeer replicationPeer;
     private final String peerId;
@@ -67,7 +64,6 @@ public interface ReplicationEndpoint extends 
ReplicationPeerConfigListener {
     public Context(
         final Configuration localConf,
         final Configuration conf,
-        final FileSystem fs,
         final String peerId,
         final UUID clusterId,
         final ReplicationPeer replicationPeer,
@@ -76,7 +72,6 @@ public interface ReplicationEndpoint extends 
ReplicationPeerConfigListener {
         final Abortable abortable) {
       this.localConf = localConf;
       this.conf = conf;
-      this.fs = fs;
       this.clusterId = clusterId;
       this.peerId = peerId;
       this.replicationPeer = replicationPeer;
@@ -90,9 +85,6 @@ public interface ReplicationEndpoint extends 
ReplicationPeerConfigListener {
     public Configuration getLocalConfiguration() {
       return localConf;
     }
-    public FileSystem getFilesystem() {
-      return fs;
-    }
     public UUID getClusterId() {
       return clusterId;
     }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/FSWALEntryStream.java
similarity index 73%
copy from 
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
copy to 
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/FSWALEntryStream.java
index 3d90153..ccd0c9a 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/FSWALEntryStream.java
@@ -18,7 +18,7 @@
  */
 package org.apache.hadoop.hbase.replication.regionserver;
 
-import java.io.Closeable;
+import java.io.EOFException;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.OptionalLong;
@@ -38,68 +38,79 @@ import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.hbase.wal.WAL.Reader;
 import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.hadoop.hbase.wal.WALIdentity;
+import org.apache.hadoop.hbase.wal.WALProvider;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import com.google.common.annotations.VisibleForTesting;
 
 /**
- * Streaming access to WAL entries. This class is given a queue of WAL {@link 
WALIdentity}, and
- * continually iterates through all the WAL {@link Entry} in the queue. When 
it's done reading from
- * a Wal, it dequeues it and starts reading from the next.
+ * Streaming access to WAL entries. This class is given a queue of WAL {@link 
Path}, and continually
+ * iterates through all the WAL {@link Entry} in the queue. When it's done 
reading from a Path, it
+ * dequeues it and starts reading from the next.
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
-class WALEntryStream implements Closeable {
-  private static final Logger LOG = 
LoggerFactory.getLogger(WALEntryStream.class);
+public class FSWALEntryStream implements WALEntryStream {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FSWALEntryStream.class);
 
-  private Reader reader;
-  private WALIdentity currentWAlIdentity;
+  private FileSystem fs;
+
+  private boolean eofAutoRecovery;
+  protected Reader reader;
+  protected WALIdentity currentWAlIdentity;
   // cache of next entry for hasNext()
-  private Entry currentEntry;
+  protected Entry currentEntry;
   // position for the current entry. As now we support peek, which means that 
the upper layer may
   // choose to return before reading the current entry, so it is not safe to 
return the value below
   // in getPosition.
-  private long currentPositionOfEntry = 0;
+  protected long currentPositionOfEntry = 0;
   // position after reading current entry
-  private long currentPositionOfReader = 0;
-  private final PriorityBlockingQueue<WALIdentity> logQueue;
-  private final FileSystem fs;
-  private final Configuration conf;
-  private final WALFileLengthProvider walFileLengthProvider;
+  protected long currentPositionOfReader = 0;
+  protected final PriorityBlockingQueue<WALIdentity> logQueue;
+  protected final Configuration conf;
   // which region server the WALs belong to
-  private final ServerName serverName;
-  private final MetricsSource metrics;
+  protected final ServerName serverName;
+  protected final MetricsSource metrics;
+
+  protected final WALProvider walProvider;
 
   /**
    * Create an entry stream over the given queue at the given start position
    * @param logQueue the queue of WAL walIds
-   * @param fs {@link FileSystem} to use to create {@link Reader} for this 
stream
    * @param conf {@link Configuration} to use to create {@link Reader} for 
this stream
    * @param startPosition the position in the first WAL to start reading at
    * @param serverName the server name which all WALs belong to
    * @param metrics replication metrics
-   * @throws IOException
+   * @param walProvider wal provider
    */
-  public WALEntryStream(PriorityBlockingQueue<WALIdentity> logQueue, 
FileSystem fs,
-      Configuration conf, long startPosition, WALFileLengthProvider 
walFileLengthProvider,
-      ServerName serverName, MetricsSource metrics) throws IOException {
+
+  public FSWALEntryStream(FileSystem fs, PriorityBlockingQueue<WALIdentity> 
logQueue,
+      Configuration conf, long startPosition, ServerName serverName, 
MetricsSource metrics,
+      WALProvider walProvider) throws IOException {
     this.logQueue = logQueue;
-    this.fs = fs;
     this.conf = conf;
     this.currentPositionOfEntry = startPosition;
-    this.walFileLengthProvider = walFileLengthProvider;
     this.serverName = serverName;
     this.metrics = metrics;
+    this.walProvider = walProvider;
+    this.fs = fs;
+    this.eofAutoRecovery = 
conf.getBoolean("replication.source.eof.autorecovery", false);
   }
 
   /**
    * @return true if there is another WAL {@link Entry}
    */
+  @Override
   public boolean hasNext() throws IOException {
     if (currentEntry == null) {
-      tryAdvanceEntry();
+      try {
+        tryAdvanceEntry();
+      } catch (IOException e) {
+        handleIOException(logQueue.peek(), e);
+      }
     }
     return currentEntry != null;
   }
@@ -107,13 +118,15 @@ class WALEntryStream implements Closeable {
   /**
    * Returns the next WAL entry in this stream but does not advance.
    */
+  @Override
   public Entry peek() throws IOException {
-    return hasNext() ? currentEntry: null;
+    return hasNext() ? currentEntry : null;
   }
 
   /**
    * Returns the next WAL entry in this stream and advance the stream.
    */
+  @Override
   public Entry next() throws IOException {
     Entry save = peek();
     currentPositionOfEntry = currentPositionOfReader;
@@ -132,6 +145,7 @@ class WALEntryStream implements Closeable {
   /**
    * @return the position of the last Entry returned by next()
    */
+  @Override
   public long getPosition() {
     return currentPositionOfEntry;
   }
@@ -139,11 +153,12 @@ class WALEntryStream implements Closeable {
   /**
    * @return the {@link WALIdentity} of the current WAL
    */
+  @Override
   public WALIdentity getCurrentWalIdentity() {
     return currentWAlIdentity;
   }
 
-  private String getCurrentWalIdStat() {
+  protected String getCurrentWalIdStat() {
     StringBuilder sb = new StringBuilder();
     if (currentWAlIdentity != null) {
       sb.append("currently replicating from: 
").append(currentWAlIdentity).append(" at position: ")
@@ -158,13 +173,14 @@ class WALEntryStream implements Closeable {
    * Should be called if the stream is to be reused (i.e. used again after 
hasNext() has returned
    * false)
    */
+  @Override
   public void reset() throws IOException {
     if (reader != null && currentWAlIdentity != null) {
       resetReader();
     }
   }
 
-  private void setPosition(long position) {
+  protected void setPosition(long position) {
     currentPositionOfEntry = position;
   }
 
@@ -199,13 +215,109 @@ class WALEntryStream implements Closeable {
     // do nothing if we don't have a WAL Reader (e.g. if there's no logs in 
queue)
   }
 
+  private void dequeueCurrentLog() throws IOException {
+    LOG.debug("Reached the end of log {}", currentWAlIdentity);
+    closeReader();
+    logQueue.remove();
+    setPosition(0);
+    metrics.decrSizeOfLogQueue();
+  }
+
+  private void closeReader() throws IOException {
+    if (reader != null) {
+      reader.close();
+      reader = null;
+    }
+  }
+
+  // if we don't have a reader, open a reader on the next log
+  private boolean checkReader() throws IOException {
+    if (reader == null) {
+      return openNextLog();
+    }
+    return true;
+  }
+
+  // open a reader on the next log in queue
+  private boolean openNextLog() throws IOException {
+    WALIdentity nextWalId = logQueue.peek();
+    if (nextWalId != null) {
+      openReader(nextWalId);
+      if (reader != null) {
+        return true;
+      }
+    } else {
+      // no more files in queue, this could happen for recovered queue, or for 
a wal group of a sync
+      // replication peer which has already been transited to DA or S.
+      setCurrentWalId(null);
+    }
+    return false;
+  }
+
+  private void openReader(WALIdentity walId) throws IOException {
+    try {
+      // Detect if this is a new file, if so get a new reader else
+      // reset the current reader so that we see the new data
+      if (reader == null || !getCurrentWalIdentity().equals(walId)) {
+        closeReader();
+        reader = createReader(walId, conf);
+        seek();
+        setCurrentWalId(walId);
+      } else {
+        resetReader();
+      }
+    } catch (RemoteException re) {
+      IOException ioe = re.unwrapRemoteException(FileNotFoundException.class);
+      if (!(ioe instanceof FileNotFoundException)) {
+        throw ioe;
+      }
+      handleIOException(walId, ioe);
+    } catch (IOException ioe) {
+      handleIOException(walId, ioe);
+    } catch (NullPointerException npe) {
+      // Workaround for race condition in HDFS-4380
+      // which throws a NPE if we open a file before any data node has the 
most recent block
+      // Just sleep and retry. Will require re-reading compressed WALs for 
compressionContext.
+      LOG.warn("Got NPE opening reader, will retry.");
+      reader = null;
+    }
+  }
+
+  private void resetReader() throws IOException {
+    try {
+      currentEntry = null;
+      reader.reset();
+      seek();
+    } catch (IOException io) {
+      handleIOException(currentWAlIdentity, io);
+    } catch (NullPointerException npe) {
+      throw new IOException("NPE resetting reader, likely HDFS-4380", npe);
+    }
+  }
+
+  private void seek() throws IOException {
+    if (currentPositionOfEntry != 0) {
+      reader.seek(currentPositionOfEntry);
+    }
+  }
+
+  @Override
+  public Entry next(Entry reuse) throws IOException {
+    return reader.next(reuse);
+  }
+
+  @Override
+  public void seek(long pos) throws IOException {
+    reader.seek(pos);
+  }
+
   // HBASE-15984 check to see we have in fact parsed all data in a cleanly 
closed file
   private boolean checkAllBytesParsed() throws IOException {
     // -1 means the wal wasn't closed cleanly.
     final long trailerSize = currentTrailerSize();
     FileStatus stat = null;
     try {
-      stat = 
fs.getFileStatus(((FSWALIdentity)this.currentWAlIdentity).getPath());
+      stat = fs.getFileStatus(((FSWALIdentity) 
this.currentWAlIdentity).getPath());
     } catch (IOException exception) {
       LOG.warn("Couldn't get file length information about log {}, it {} 
closed cleanly {}",
         currentWAlIdentity, trailerSize < 0 ? "was not" : "was", 
getCurrentWalIdStat());
@@ -222,16 +334,15 @@ class WALEntryStream implements Closeable {
         if (currentPositionOfReader < stat.getLen()) {
           final long skippedBytes = stat.getLen() - currentPositionOfReader;
           LOG.debug(
-            "Reached the end of WAL file '{}'. It was not closed cleanly," +
-              " so we did not parse {} bytes of data. This is normally ok.",
+            "Reached the end of WAL file '{}'. It was not closed cleanly,"
+                + " so we did not parse {} bytes of data. This is normally 
ok.",
             currentWAlIdentity, skippedBytes);
           metrics.incrUncleanlyClosedWALs();
           metrics.incrBytesSkippedInUncleanlyClosedWALs(skippedBytes);
         }
       } else if (currentPositionOfReader + trailerSize < stat.getLen()) {
-        LOG.warn(
-          "Processing end of WAL file '{}'. At position {}, which is too far 
away from" +
-            " reported file length {}. Restarting WAL reading (see HBASE-15983 
for details). {}",
+        LOG.warn("Processing end of WAL file '{}'. At position {}, which is 
too far away from"
+            + " reported file length {}. Restarting WAL reading (see 
HBASE-15983 for details). {}",
           currentWAlIdentity, currentPositionOfReader, stat.getLen(), 
getCurrentWalIdStat());
         setPosition(0);
         resetReader();
@@ -248,72 +359,6 @@ class WALEntryStream implements Closeable {
     return true;
   }
 
-  private void dequeueCurrentLog() throws IOException {
-    LOG.debug("Reached the end of log {}", currentWAlIdentity);
-    closeReader();
-    logQueue.remove();
-    setPosition(0);
-    metrics.decrSizeOfLogQueue();
-  }
-
-  /**
-   * Returns whether the file is opened for writing.
-   */
-  private boolean readNextEntryAndRecordReaderPosition() throws IOException {
-    Entry readEntry = reader.next();
-    long readerPos = reader.getPosition();
-    OptionalLong fileLength =
-        walFileLengthProvider.getLogFileSizeIfBeingWritten(currentWAlIdentity);
-    if (fileLength.isPresent() && readerPos > fileLength.getAsLong()) {
-      // see HBASE-14004, for AsyncFSWAL which uses fan-out, it is possible 
that we read uncommitted
-      // data, so we need to make sure that we do not read beyond the 
committed file length.
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("The provider tells us the valid length for " + 
currentWAlIdentity + " is " +
-            fileLength.getAsLong() + ", but we have advanced to " + readerPos);
-      }
-      resetReader();
-      return true;
-    }
-    if (readEntry != null) {
-      metrics.incrLogEditsRead();
-      metrics.incrLogReadInBytes(readerPos - currentPositionOfEntry);
-    }
-    currentEntry = readEntry; // could be null
-    this.currentPositionOfReader = readerPos;
-    return fileLength.isPresent();
-  }
-
-  private void closeReader() throws IOException {
-    if (reader != null) {
-      reader.close();
-      reader = null;
-    }
-  }
-
-  // if we don't have a reader, open a reader on the next log
-  private boolean checkReader() throws IOException {
-    if (reader == null) {
-      return openNextLog();
-    }
-    return true;
-  }
-
-  // open a reader on the next log in queue
-  private boolean openNextLog() throws IOException {
-    WALIdentity nextWalId = logQueue.peek();
-    if (nextWalId != null) {
-      openReader((FSWALIdentity)nextWalId);
-      if (reader != null) {
-        return true;
-      }
-    } else {
-      // no more files in queue, this could happen for recovered queue, or for 
a wal group of a sync
-      // replication peer which has already been transited to DA or S.
-      setCurrentWalId(null);
-    }
-    return false;
-  }
-
   private Path getArchivedLog(Path path) throws IOException {
     Path rootDir = FSUtils.getRootDir(conf);
 
@@ -326,9 +371,8 @@ class WALEntryStream implements Closeable {
     }
 
     // Try found the log in the seperate old log dir
-    oldLogDir =
-        new Path(rootDir, new StringBuilder(HConstants.HREGION_OLDLOGDIR_NAME)
-            
.append(Path.SEPARATOR).append(serverName.getServerName()).toString());
+    oldLogDir = new Path(rootDir, new 
StringBuilder(HConstants.HREGION_OLDLOGDIR_NAME)
+        .append(Path.SEPARATOR).append(serverName.getServerName()).toString());
     archivedLogLocation = new Path(oldLogDir, path.getName());
     if (fs.exists(archivedLogLocation)) {
       LOG.info("Log " + path + " was moved to " + archivedLogLocation);
@@ -341,6 +385,7 @@ class WALEntryStream implements Closeable {
 
   private void handleFileNotFound(FSWALIdentity walId, FileNotFoundException 
fnfe)
       throws IOException {
+
     // If the log was archived, continue reading from there
     FSWALIdentity archivedLog = new 
FSWALIdentity(getArchivedLog(walId.getPath()));
     if (!walId.equals(archivedLog)) {
@@ -350,38 +395,6 @@ class WALEntryStream implements Closeable {
     }
   }
 
-  private void openReader(FSWALIdentity walId) throws IOException {
-    try {
-      // Detect if this is a new file, if so get a new reader else
-      // reset the current reader so that we see the new data
-      if (reader == null || !getCurrentWalIdentity().equals(walId)) {
-        closeReader();
-        reader = WALFactory.createReader(fs, walId.getPath(), conf);
-        seek();
-        setCurrentWalId(walId);
-      } else {
-        resetReader();
-      }
-    } catch (FileNotFoundException fnfe) {
-      handleFileNotFound(walId, fnfe);
-    }  catch (RemoteException re) {
-      IOException ioe = re.unwrapRemoteException(FileNotFoundException.class);
-      if (!(ioe instanceof FileNotFoundException)) throw ioe;
-      handleFileNotFound(walId, (FileNotFoundException)ioe);
-    } catch (LeaseNotRecoveredException lnre) {
-      // HBASE-15019 the WAL was not closed due to some hiccup.
-      LOG.warn("Try to recover the WAL lease " + currentWAlIdentity, lnre);
-      recoverLease(conf, ((FSWALIdentity)currentWAlIdentity).getPath());
-      reader = null;
-    } catch (NullPointerException npe) {
-      // Workaround for race condition in HDFS-4380
-      // which throws a NPE if we open a file before any data node has the 
most recent block
-      // Just sleep and retry. Will require re-reading compressed WALs for 
compressionContext.
-      LOG.warn("Got NPE opening reader, will retry.");
-      reader = null;
-    }
-  }
-
   // For HBASE-15019
   private void recoverLease(final Configuration conf, final Path path) {
     try {
@@ -399,28 +412,18 @@ class WALEntryStream implements Closeable {
     }
   }
 
-  private void resetReader() throws IOException {
+  private void handleIOException(WALIdentity walId, IOException e) throws 
IOException {
     try {
-      currentEntry = null;
-      reader.reset();
-      seek();
+      throw e;
     } catch (FileNotFoundException fnfe) {
-      // If the log was archived, continue reading from there
-      FSWALIdentity archivedLog =
-          new FSWALIdentity(getArchivedLog(((FSWALIdentity) 
currentWAlIdentity).getPath()));
-      if (!currentWAlIdentity.equals(archivedLog)) {
-        openReader(archivedLog);
-      } else {
-        throw fnfe;
-      }
-    } catch (NullPointerException npe) {
-      throw new IOException("NPE resetting reader, likely HDFS-4380", npe);
-    }
-  }
-
-  private void seek() throws IOException {
-    if (currentPositionOfEntry != 0) {
-      reader.seek(currentPositionOfEntry);
+      handleFileNotFound((FSWALIdentity) walId, fnfe);
+    } catch (EOFException eo) {
+      handleEofException(eo);
+    } catch (LeaseNotRecoveredException lnre) {
+      // HBASE-15019 the WAL was not closed due to some hiccup.
+      LOG.warn("Try to recover the WAL lease " + currentWAlIdentity, lnre);
+      recoverLease(conf, ((FSWALIdentity) currentWAlIdentity).getPath());
+      reader = null;
     }
   }
 
@@ -432,4 +435,64 @@ class WALEntryStream implements Closeable {
     }
     return size;
   }
+
+  // if we get an EOF due to a zero-length log, and there are other logs in 
queue
+  // (highly likely we've closed the current log), we've hit the max retries, 
and autorecovery is
+  // enabled, then dump the log
+  private void handleEofException(IOException e) {
+    if ((e instanceof EOFException || e.getCause() instanceof EOFException) && 
logQueue.size() > 1
+        && this.eofAutoRecovery) {
+      WALIdentity walId = logQueue.peek();
+      try {
+        Path path = ((FSWALIdentity) walId).getPath();
+        if (fs.getFileStatus(path).getLen() == 0) {
+          LOG.warn("Forcing removal of 0 length log in queue: {} ", walId);
+          logQueue.remove();
+          currentPositionOfEntry = 0;
+        }
+      } catch (IOException ioe) {
+        LOG.warn("Couldn't get file length information about log: {} ", walId);
+      }
+    }
+  }
+
+  private Reader createReader(WALIdentity walId, Configuration conf) throws 
IOException {
+    Path path = ((FSWALIdentity) walId).getPath();
+    return WALFactory.createReader(fs, path, conf);
+  }
+
+  /**
+   * Returns whether the file is opened for writing.
+   */
+  protected boolean readNextEntryAndRecordReaderPosition() throws IOException {
+    Entry readEntry = reader.next();
+    long readerPos = reader.getPosition();
+    OptionalLong fileLength = getWALFileLengthProvider()
+        .getLogFileSizeIfBeingWritten(((FSWALIdentity) 
currentWAlIdentity).getPath());
+    if (fileLength.isPresent() && readerPos > fileLength.getAsLong()) {
+      // see HBASE-14004, for AsyncFSWAL which uses fan-out, it is possible 
that we read uncommitted
+      // data, so we need to make sure that we do not read beyond the 
committed file length.
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("The provider tells us the valid length for " + 
currentWAlIdentity + " is "
+            + fileLength.getAsLong() + ", but we have advanced to " + 
readerPos);
+      }
+      resetReader();
+      return true;
+    }
+    if (readEntry != null) {
+      metrics.incrLogEditsRead();
+      metrics.incrLogReadInBytes(readerPos - currentPositionOfEntry);
+    }
+    currentEntry = readEntry; // could be null
+    this.currentPositionOfReader = readerPos;
+    return fileLength.isPresent();
+  }
+
+  @VisibleForTesting
+  public WALFileLengthProvider getWALFileLengthProvider() {
+    return path -> this.walProvider.getWALs().stream()
+        .map(w -> w.getLogFileSizeIfBeingWritten(path)).filter(o -> 
o.isPresent()).findAny()
+        .orElse(OptionalLong.empty());
+  }
+
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
index 7db53aa..d0bb50c 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
@@ -86,7 +86,6 @@ public class HBaseInterClusterReplicationEndpoint extends 
HBaseReplicationEndpoi
   private static final long DEFAULT_MAX_TERMINATION_WAIT_MULTIPLIER = 2;
 
   private ClusterConnection conn;
-  private Configuration localConf;
   private Configuration conf;
   // How long should we sleep for each retry
   private long sleepForRetries;
@@ -117,7 +116,6 @@ public class HBaseInterClusterReplicationEndpoint extends 
HBaseReplicationEndpoi
   public void init(Context context) throws IOException {
     super.init(context);
     this.conf = HBaseConfiguration.create(ctx.getConfiguration());
-    this.localConf = HBaseConfiguration.create(ctx.getLocalConfiguration());
     decorateConf();
     this.maxRetriesMultiplier = 
this.conf.getInt("replication.source.maxretriesmultiplier", 300);
     this.socketTimeoutMultiplier = 
this.conf.getInt("replication.source.socketTimeoutMultiplier",
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
index 4bb1fe3..d954082 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
@@ -18,21 +18,15 @@
 package org.apache.hadoop.hbase.replication.regionserver;
 
 import java.io.IOException;
-import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.PriorityBlockingQueue;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.replication.ReplicationPeer;
 import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
-import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
-import org.apache.hadoop.hbase.wal.FSWALIdentity;
 import org.apache.hadoop.hbase.wal.WALIdentity;
+import org.apache.hadoop.hbase.wal.WALProvider;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -44,17 +38,16 @@ import org.slf4j.LoggerFactory;
 @InterfaceAudience.Private
 public class RecoveredReplicationSource extends ReplicationSource {
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(RecoveredReplicationSource.class);
-
   private String actualPeerId;
+  private static final Logger LOG = 
LoggerFactory.getLogger(RecoveredReplicationSource.class);
 
   @Override
-  public void init(Configuration conf, FileSystem fs, ReplicationSourceManager 
manager,
+  public void init(Configuration conf, ReplicationSourceManager manager,
       ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, 
Server server,
-      String peerClusterZnode, UUID clusterId, WALFileLengthProvider 
walFileLengthProvider,
-      MetricsSource metrics) throws IOException {
-    super.init(conf, fs, manager, queueStorage, replicationPeer, server, 
peerClusterZnode,
-      clusterId, walFileLengthProvider, metrics);
+      String peerClusterZnode, UUID clusterId, MetricsSource metrics, 
WALProvider walProvider)
+      throws IOException {
+    super.init(conf, manager, queueStorage, replicationPeer, server, 
peerClusterZnode, clusterId,
+      metrics, walProvider);
     this.actualPeerId = this.replicationQueueInfo.getPeerId();
   }
 
@@ -64,88 +57,6 @@ public class RecoveredReplicationSource extends 
ReplicationSource {
     return new RecoveredReplicationSourceShipper(conf, walGroupId, queue, 
this, queueStorage);
   }
 
-  public void locateRecoveredWalIds(PriorityBlockingQueue<WALIdentity> queue) 
throws IOException {
-    boolean hasPathChanged = false;
-    PriorityBlockingQueue<WALIdentity> newWalIds =
-        new PriorityBlockingQueue<WALIdentity>(queueSizePerGroup, new 
LogsComparator());
-    pathsLoop: for (WALIdentity walId : queue) {
-      if (fs.exists(((FSWALIdentity) walId).getPath())) {
-        // still in same location, don't need to
-        // do anything
-        newWalIds.add(walId);
-        continue;
-      }
-      // Path changed - try to find the right path.
-      hasPathChanged = true;
-      if (server instanceof ReplicationSyncUp.DummyServer) {
-        // In the case of disaster/recovery, HMaster may be shutdown/crashed 
before flush data
-        // from .logs to .oldlogs. Loop into .logs folders and check whether a 
match exists
-        Path newPath = getReplSyncUpPath(((FSWALIdentity)walId).getPath());
-        newWalIds.add(new FSWALIdentity(newPath));
-        continue;
-      } else {
-        // See if Path exists in the dead RS folder (there could be a chain of 
failures
-        // to look at)
-        List<ServerName> deadRegionServers = 
this.replicationQueueInfo.getDeadRegionServers();
-        LOG.info("NB dead servers : " + deadRegionServers.size());
-        final Path walDir = FSUtils.getWALRootDir(conf);
-        for (ServerName curDeadServerName : deadRegionServers) {
-          final Path deadRsDirectory =
-              new Path(walDir, 
AbstractFSWALProvider.getWALDirectoryName(curDeadServerName
-                  .getServerName()));
-          Path[] locs = new Path[] { new Path(deadRsDirectory, 
walId.getName()), new Path(
-              deadRsDirectory.suffix(AbstractFSWALProvider.SPLITTING_EXT), 
walId.getName()) };
-          for (Path possibleLogLocation : locs) {
-            LOG.info("Possible location " + 
possibleLogLocation.toUri().toString());
-            if (manager.getFs().exists(possibleLogLocation)) {
-              // We found the right new location
-              LOG.info("Log " + walId + " still exists at " + 
possibleLogLocation);
-              newWalIds.add(new FSWALIdentity(possibleLogLocation));
-              continue pathsLoop;
-            }
-          }
-        }
-        // didn't find a new location
-        LOG.error(
-          String.format("WAL Path %s doesn't exist and couldn't find its new 
location", walId));
-        newWalIds.add(walId);
-      }
-    }
-
-    if (hasPathChanged) {
-      if (newWalIds.size() != queue.size()) { // this shouldn't happen
-        LOG.error("Recovery queue size is incorrect");
-        throw new IOException("Recovery queue size error");
-      }
-      // put the correct locations in the queue
-      // since this is a recovered queue with no new incoming logs,
-      // there shouldn't be any concurrency issues
-      queue.clear();
-      for (WALIdentity walId : newWalIds) {
-        queue.add(walId);
-      }
-    }
-  }
-
-  // N.B. the ReplicationSyncUp tool sets the manager.getWALDir to the root of 
the wal
-  // area rather than to the wal area for a particular region server.
-  private Path getReplSyncUpPath(Path path) throws IOException {
-    FileStatus[] rss = fs.listStatus(manager.getLogDir());
-    for (FileStatus rs : rss) {
-      Path p = rs.getPath();
-      FileStatus[] logs = fs.listStatus(p);
-      for (FileStatus log : logs) {
-        p = new Path(p, log.getPath().getName());
-        if (p.getName().equals(path.getName())) {
-          LOG.info("Log " + p.getName() + " found at " + p);
-          return p;
-        }
-      }
-    }
-    LOG.error("Didn't find path for: " + path.getName());
-    return path;
-  }
-
   void tryFinish() {
     if (workerThreads.isEmpty()) {
       this.getSourceMetrics().clear();
@@ -167,4 +78,36 @@ public class RecoveredReplicationSource extends 
ReplicationSource {
   public boolean isRecovered() {
     return true;
   }
+
+  /**
+   * Get the updated queue of the wals if the wals are moved to another 
location.
+   * @param queue Updated queue with the new walIds
+   * @throws IOException IOException
+   */
+  public void locateRecoveredWalIds(PriorityBlockingQueue<WALIdentity> queue) 
throws IOException {
+    PriorityBlockingQueue<WALIdentity> newWalIds =
+        new PriorityBlockingQueue<WALIdentity>(queueSizePerGroup, new 
LogsComparator());
+    boolean hasPathChanged = false;
+    for (WALIdentity wal : queue) {
+      WALIdentity updateRecoveredWalIds = 
this.getWalProvider().locateWalId(wal, server,
+        this.replicationQueueInfo.getDeadRegionServers());
+      if (!updateRecoveredWalIds.equals(wal)) {
+        hasPathChanged = true;
+      }
+      newWalIds.add(updateRecoveredWalIds);
+    }
+    if (hasPathChanged) {
+      if (newWalIds.size() != queue.size()) { // this shouldn't happen
+        LOG.error("Recovery queue size is incorrect");
+        throw new IOException("Recovery queue size error");
+      }
+      // put the correct locations in the queue
+      // since this is a recovered queue with no new incoming logs,
+      // there shouldn't be any concurrency issues
+      queue.clear();
+      for (WALIdentity walId : newWalIds) {
+        queue.add(walId);
+      }
+    }
+  }
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
index 799d975..eef9ddd 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
@@ -20,13 +20,11 @@ package org.apache.hadoop.hbase.replication.regionserver;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.OptionalLong;
 import java.util.UUID;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.HConstants;
@@ -86,7 +84,7 @@ public class Replication implements ReplicationSourceService, 
ReplicationSinkSer
   }
 
   @Override
-  public void initialize(Server server, FileSystem fs, Path logDir, Path 
oldLogDir,
+  public void initialize(Server server,
       WALProvider walProvider) throws IOException {
     this.server = server;
     this.conf = this.server.getConfiguration();
@@ -125,9 +123,7 @@ public class Replication implements 
ReplicationSourceService, ReplicationSinkSer
     }
     SyncReplicationPeerMappingManager mapping = new 
SyncReplicationPeerMappingManager();
     this.replicationManager = new ReplicationSourceManager(queueStorage, 
replicationPeers,
-        replicationTracker, conf, this.server, fs, logDir, oldLogDir, 
clusterId,
-        walProvider != null ? walProvider.getWALFileLengthProvider() : p -> 
OptionalLong.empty(),
-        mapping);
+        replicationTracker, conf, this.server, clusterId, mapping, 
walProvider);
     this.syncReplicationPeerInfoProvider =
         new SyncReplicationPeerInfoProviderImpl(replicationPeers, mapping);
     PeerActionListener peerActionListener = PeerActionListener.DUMMY;
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 12c63fd..2db5a96 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -17,9 +17,6 @@
  */
 package org.apache.hadoop.hbase.replication.regionserver;
 
-import static 
org.apache.hadoop.hbase.wal.AbstractFSWALProvider.getArchivedLogPath;
-
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.util.ArrayList;
@@ -37,7 +34,6 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
@@ -61,9 +57,9 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
-import org.apache.hadoop.hbase.wal.FSWALIdentity;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.hbase.wal.WALIdentity;
+import org.apache.hadoop.hbase.wal.WALProvider;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -103,7 +99,6 @@ public class ReplicationSource implements 
ReplicationSourceInterface {
   protected Server server;
   // How long should we sleep for each retry
   private long sleepForRetries;
-  protected FileSystem fs;
   // id of this cluster
   private UUID clusterId;
   // total number of edits we replicated
@@ -126,7 +121,6 @@ public class ReplicationSource implements 
ReplicationSourceInterface {
   private ReplicationThrottler throttler;
   private long defaultBandwidth;
   private long currentBandwidth;
-  private WALFileLengthProvider walFileLengthProvider;
   @VisibleForTesting
   protected final ConcurrentHashMap<String, ReplicationSourceShipper> 
workerThreads =
       new ConcurrentHashMap<>();
@@ -139,22 +133,13 @@ public class ReplicationSource implements 
ReplicationSourceInterface {
   private int waitOnEndpointSeconds = -1;
 
   private Thread initThread;
+  protected WALProvider walProvider;
 
-  /**
-   * Instantiation method used by region servers
-   * @param conf configuration to use
-   * @param fs file system to use
-   * @param manager replication manager to ping to
-   * @param server the server for this region server
-   * @param queueId the id of our replication queue
-   * @param clusterId unique UUID for the cluster
-   * @param metrics metrics for replication source
-   */
   @Override
-  public void init(Configuration conf, FileSystem fs, ReplicationSourceManager 
manager,
+  public void init(Configuration conf, ReplicationSourceManager manager,
       ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, 
Server server,
-      String queueId, UUID clusterId, WALFileLengthProvider 
walFileLengthProvider,
-      MetricsSource metrics) throws IOException {
+      String queueId, UUID clusterId, MetricsSource metrics, WALProvider 
walProvider)
+      throws IOException {
     this.server = server;
     this.conf = HBaseConfiguration.create(conf);
     this.waitOnEndpointSeconds =
@@ -170,7 +155,6 @@ public class ReplicationSource implements 
ReplicationSourceInterface {
     this.manager = manager;
     this.metrics = metrics;
     this.clusterId = clusterId;
-    this.fs = fs;
     this.queueId = queueId;
     this.replicationQueueInfo = new ReplicationQueueInfo(queueId);
     this.logQueueWarnThreshold = 
this.conf.getInt("replication.source.log.queue.warn", 2);
@@ -179,7 +163,7 @@ public class ReplicationSource implements 
ReplicationSourceInterface {
     currentBandwidth = getCurrentBandwidth();
     this.throttler = new ReplicationThrottler((double) currentBandwidth / 
10.0);
     this.totalBufferUsed = manager.getTotalBufferUsed();
-    this.walFileLengthProvider = walFileLengthProvider;
+    this.walProvider = walProvider;
     LOG.info("queueId={}, ReplicationSource : {}, currentBandwidth={}", 
queueId,
       replicationPeer.getId(), this.currentBandwidth);
   }
@@ -283,7 +267,7 @@ public class ReplicationSource implements 
ReplicationSourceInterface {
       tableDescriptors = ((HRegionServer) server).getTableDescriptors();
     }
     replicationEndpoint
-      .init(new ReplicationEndpoint.Context(conf, 
replicationPeer.getConfiguration(), fs,
+      .init(new ReplicationEndpoint.Context(conf, 
replicationPeer.getConfiguration(),
         replicationPeer.getId(), clusterId, replicationPeer, metrics, 
tableDescriptors, server));
     replicationEndpoint.start();
     replicationEndpoint.awaitRunning(waitOnEndpointSeconds, TimeUnit.SECONDS);
@@ -320,7 +304,8 @@ public class ReplicationSource implements 
ReplicationSourceInterface {
   @Override
   public Map<String, ReplicationStatus> getWalGroupStatus() {
     Map<String, ReplicationStatus> sourceReplicationStatus = new TreeMap<>();
-    long lastTimeStamp, ageOfLastShippedOp, replicationDelay, fileSize;
+    long lastTimeStamp, ageOfLastShippedOp, replicationDelay;
+    long logSize = -1;
     for (Map.Entry<String, ReplicationSourceShipper> walGroupShipper : 
workerThreads.entrySet()) {
       String walGroupId = walGroupShipper.getKey();
       ReplicationSourceShipper shipper = walGroupShipper.getValue();
@@ -330,19 +315,14 @@ public class ReplicationSource implements 
ReplicationSourceInterface {
       replicationDelay =
           ReplicationLoad.calculateReplicationDelay(ageOfLastShippedOp, 
lastTimeStamp, queueSize);
       WALIdentity currentPath = shipper.getCurrentWALIdentity();
-      try {
-        fileSize = getFileSize(((FSWALIdentity)currentPath).getPath());
-      } catch (IOException e) {
-        LOG.warn("Ignore the exception as the file size of HLog only affects 
the web ui", e);
-        fileSize = -1;
-      }
+      //TODO Fix log size
       ReplicationStatus.ReplicationStatusBuilder statusBuilder = 
ReplicationStatus.newBuilder();
       statusBuilder.withPeerId(this.getPeerId())
           .withQueueSize(queueSize)
           .withWalGroup(walGroupId)
           .withCurrentWalId(currentPath)
           .withCurrentPosition(shipper.getCurrentPosition())
-          .withFileSize(fileSize)
+          .withFileSize(logSize)
           .withAgeOfLastShippedOp(ageOfLastShippedOp)
           .withReplicationDelay(replicationDelay);
       sourceReplicationStatus.put(this.getPeerId() + "=>" + walGroupId, 
statusBuilder.build());
@@ -350,16 +330,6 @@ public class ReplicationSource implements 
ReplicationSourceInterface {
     return sourceReplicationStatus;
   }
 
-  private long getFileSize(Path currentPath) throws IOException {
-    long fileSize;
-    try {
-      fileSize = fs.getContentSummary(currentPath).getLength();
-    } catch (FileNotFoundException e) {
-      currentPath = getArchivedLogPath(currentPath, conf);
-      fileSize = fs.getContentSummary(currentPath).getLength();
-    }
-    return fileSize;
-  }
 
   protected ReplicationSourceShipper createNewShipper(String walGroupId,
       PriorityBlockingQueue<WALIdentity> queue) {
@@ -369,8 +339,8 @@ public class ReplicationSource implements 
ReplicationSourceInterface {
   private ReplicationSourceWALReader createNewWALReader(String walGroupId,
       PriorityBlockingQueue<WALIdentity> queue, long startPosition) {
     return replicationPeer.getPeerConfig().isSerial()
-      ? new SerialReplicationSourceWALReader(fs, conf, queue, startPosition, 
walEntryFilter, this)
-      : new ReplicationSourceWALReader(fs, conf, queue, startPosition, 
walEntryFilter, this);
+      ? new SerialReplicationSourceWALReader(conf, queue, startPosition, 
walEntryFilter, this)
+      : new ReplicationSourceWALReader(conf, queue, startPosition, 
walEntryFilter, this);
   }
 
   protected final void uncaughtException(Thread t, Throwable e) {
@@ -663,7 +633,7 @@ public class ReplicationSource implements 
ReplicationSourceInterface {
 
   @Override
   //offsets totalBufferUsed by deducting shipped batchSize.
-  public void postShipEdits(List<Entry> entries, int batchSize) {
+  public void postShipEdits(List<Entry> entries, long batchSize) {
     if (throttler.isEnabled()) {
       throttler.addPushSize(batchSize);
     }
@@ -672,11 +642,6 @@ public class ReplicationSource implements 
ReplicationSourceInterface {
   }
 
   @Override
-  public WALFileLengthProvider getWALFileLengthProvider() {
-    return walFileLengthProvider;
-  }
-
-  @Override
   public ServerName getServerWALsBelongTo() {
     return server.getServerName();
   }
@@ -697,4 +662,9 @@ public class ReplicationSource implements 
ReplicationSourceInterface {
   void removeWorker(ReplicationSourceShipper worker) {
     workerThreads.remove(worker.walGroupId, worker);
   }
+
+  public WALProvider getWalProvider() {
+    return walProvider;
+  }
+
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
index 3058fcc..d86bb74 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
@@ -23,9 +23,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
-
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
@@ -37,6 +35,7 @@ import 
org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.hbase.wal.WALIdentity;
+import org.apache.hadoop.hbase.wal.WALProvider;
 import org.apache.yetus.audience.InterfaceAudience;
 
 /**
@@ -48,14 +47,20 @@ public interface ReplicationSourceInterface {
   /**
    * Initializer for the source
    * @param conf the configuration to use
-   * @param fs the file system to use
    * @param manager the manager to use
+   * @param queueStorage replication queue storage
+   * @param replicationPeer Replication Peer
    * @param server the server for this region server
+   * @param queueId Id of the queue
+   * @param clusterId id of the cluster
+   * @param metrics metric source for publishing replication metrics
+   * @param walProvider wal provider
+   * @throws IOException IOException
    */
-  void init(Configuration conf, FileSystem fs, ReplicationSourceManager 
manager,
+  void init(Configuration conf, ReplicationSourceManager manager,
       ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, 
Server server,
-      String queueId, UUID clusterId, WALFileLengthProvider 
walFileLengthProvider,
-      MetricsSource metrics) throws IOException;
+      String queueId, UUID clusterId, MetricsSource metrics, WALProvider 
walProvider)
+      throws IOException;
 
   /**
    * Add a log to the list of logs to replicate
@@ -160,11 +165,6 @@ public interface ReplicationSourceInterface {
   ReplicationSourceManager getSourceManager();
 
   /**
-   * @return the wal file length provider
-   */
-  WALFileLengthProvider getWALFileLengthProvider();
-
-  /**
    * Try to throttle when the peer config with a bandwidth
    * @param batchSize entries size will be pushed
    * @throws InterruptedException
@@ -176,7 +176,7 @@ public interface ReplicationSourceInterface {
    * @param entries pushed
    * @param batchSize entries size pushed
    */
-  void postShipEdits(List<Entry> entries, int batchSize);
+  void postShipEdits(List<Entry> entries, long batchSize);
 
   /**
    * The queue of WALs only belong to one region server. This will return the 
server name which all
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index dd31a01..595d278 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -63,9 +63,9 @@ import org.apache.hadoop.hbase.replication.ReplicationUtils;
 import org.apache.hadoop.hbase.replication.SyncReplicationState;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
-import org.apache.hadoop.hbase.wal.FSWALIdentity;
 import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider;
 import org.apache.hadoop.hbase.wal.WALIdentity;
+import org.apache.hadoop.hbase.wal.WALProvider;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
@@ -149,14 +149,8 @@ public class ReplicationSourceManager implements 
ReplicationListener {
   private final SyncReplicationPeerMappingManager 
syncReplicationPeerMappingManager;
 
   private final Configuration conf;
-  private final FileSystem fs;
   // The paths to the latest log of each wal group, for new coming peers
   private final Map<String, WALIdentity> latestWalIds;
-  // Path to the wals directories
-  private final Path logDir;
-  // Path to the wal archive
-  private final Path oldLogDir;
-  private final WALFileLengthProvider walFileLengthProvider;
   // The number of ms that we wait before moving znodes, HBASE-3596
   private final long sleepBeforeFailover;
   // Homemade executer service for replication
@@ -172,6 +166,7 @@ public class ReplicationSourceManager implements 
ReplicationListener {
   // Maximum number of retries before taking bold actions when deleting remote 
wal files for sync
   // replication peer.
   private final int maxRetriesMultiplier;
+  private final WALProvider walProvider;
 
   /**
    * Creates a replication manager and sets the watch on all the other 
registered region servers
@@ -180,16 +175,14 @@ public class ReplicationSourceManager implements 
ReplicationListener {
    * @param replicationTracker
    * @param conf the configuration to use
    * @param server the server for this region server
-   * @param fs the file system to use
-   * @param logDir the directory that contains all wal directories of live RSs
-   * @param oldLogDir the directory where old logs are archived
-   * @param clusterId
+   * @param clusterId id of the cluster
+   * @param walProvider Wal Provider
    */
   public ReplicationSourceManager(ReplicationQueueStorage queueStorage,
       ReplicationPeers replicationPeers, ReplicationTracker 
replicationTracker, Configuration conf,
-      Server server, FileSystem fs, Path logDir, Path oldLogDir, UUID 
clusterId,
-      WALFileLengthProvider walFileLengthProvider,
-      SyncReplicationPeerMappingManager syncReplicationPeerMappingManager) 
throws IOException {
+      Server server, UUID clusterId,
+      SyncReplicationPeerMappingManager syncReplicationPeerMappingManager, 
WALProvider walProvider)
+      throws IOException {
     this.sources = new ConcurrentHashMap<>();
     this.queueStorage = queueStorage;
     this.replicationPeers = replicationPeers;
@@ -199,13 +192,10 @@ public class ReplicationSourceManager implements 
ReplicationListener {
     this.walsByIdRecoveredQueues = new ConcurrentHashMap<>();
     this.oldsources = new ArrayList<>();
     this.conf = conf;
-    this.fs = fs;
-    this.logDir = logDir;
-    this.oldLogDir = oldLogDir;
+    this.walProvider = walProvider;
     // 30 seconds
     this.sleepBeforeFailover = 
conf.getLong("replication.sleep.before.failover", 30000);
     this.clusterId = clusterId;
-    this.walFileLengthProvider = walFileLengthProvider;
     this.syncReplicationPeerMappingManager = syncReplicationPeerMappingManager;
     this.replicationTracker.registerListener(this);
     // It's preferable to failover 1 RS at a time, but with good zk servers
@@ -352,8 +342,8 @@ public class ReplicationSourceManager implements 
ReplicationListener {
 
     MetricsSource metrics = new MetricsSource(queueId);
     // init replication source
-    src.init(conf, fs, this, queueStorage, replicationPeer, server, queueId, 
clusterId,
-      walFileLengthProvider, metrics);
+    src.init(conf, this, queueStorage, replicationPeer, server, queueId, 
clusterId,
+       metrics, walProvider);
     return src;
   }
 
@@ -487,7 +477,8 @@ public class ReplicationSourceManager implements 
ReplicationListener {
         toRemove.terminate(terminateMessage);
       }
       for (NavigableSet<String> walsByGroup : walsById.get(peerId).values()) {
-        walsByGroup.forEach(wal -> src.enqueueLog(new FSWALIdentity(new 
Path(this.logDir, wal))));
+        walsByGroup.forEach(
+          wal -> 
src.enqueueLog(walProvider.createWalIdentity(server.getServerName(), wal, 
false)));
       }
     }
     LOG.info("Startup replication source for " + src.getPeerId());
@@ -508,7 +499,8 @@ public class ReplicationSourceManager implements 
ReplicationListener {
         ReplicationSourceInterface replicationSource = createSource(queueId, 
peer);
         this.oldsources.add(replicationSource);
         for (SortedSet<String> walsByGroup : 
walsByIdRecoveredQueues.get(queueId).values()) {
-          walsByGroup.forEach(wal -> src.enqueueLog(new FSWALIdentity(wal)));
+          walsByGroup.forEach(wal -> src
+              
.enqueueLog(walProvider.createWalIdentity(server.getServerName(), wal, false)));
         }
         toStartup.add(replicationSource);
       }
@@ -675,6 +667,8 @@ public class ReplicationSourceManager implements 
ReplicationListener {
   private void removeRemoteWALs(String peerId, String remoteWALDir, 
Collection<String> wals)
       throws IOException {
     Path remoteWALDirForPeer = 
ReplicationUtils.getPeerRemoteWALDir(remoteWALDir, peerId);
+    //Currently Sync replication is only supported on FS based WALProvider
+    //TODO: Abstract FileSystem once all APIs for Sync replication is conciled 
for remote calls.
     FileSystem fs = ReplicationUtils.getRemoteWALFileSystem(conf, 
remoteWALDir);
     for (String wal : wals) {
       Path walFile = new Path(remoteWALDirForPeer, wal);
@@ -964,7 +958,7 @@ public class ReplicationSourceManager implements 
ReplicationListener {
             }
             oldsources.add(src);
             for (String wal : walsSet) {
-              src.enqueueLog(new FSWALIdentity(new Path(oldLogDir, wal)));
+              
src.enqueueLog(walProvider.createWalIdentity(server.getServerName(), wal, 
true));
             }
             src.startup();
           }
@@ -1060,30 +1054,6 @@ public class ReplicationSourceManager implements 
ReplicationListener {
   }
 
   /**
-   * Get the directory where wals are archived
-   * @return the directory where wals are archived
-   */
-  public Path getOldLogDir() {
-    return this.oldLogDir;
-  }
-
-  /**
-   * Get the directory where wals are stored by their RSs
-   * @return the directory where wals are stored by their RSs
-   */
-  public Path getLogDir() {
-    return this.logDir;
-  }
-
-  /**
-   * Get the handle on the local file system
-   * @return Handle on the local file system
-   */
-  public FileSystem getFs() {
-    return this.fs;
-  }
-
-  /**
    * Get the ReplicationPeers used by this ReplicationSourceManager
    * @return the ReplicationPeers used by this ReplicationSourceManager
    */
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
index 8ecd5bd..23c2088 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
@@ -148,8 +148,8 @@ public class ReplicationSourceShipper extends Thread {
    * get batchEntry size excludes bulk load file sizes.
    * Uses ReplicationSourceWALReader's static method.
    */
-  private int getBatchEntrySizeExcludeBulkLoad(WALEntryBatch entryBatch) {
-    int totalSize = 0;
+  private long getBatchEntrySizeExcludeBulkLoad(WALEntryBatch entryBatch) {
+    long totalSize = 0;
     for(Entry entry : entryBatch.getWalEntries()) {
       totalSize += entryReader.getEntrySizeExcludeBulkLoad(entry);
     }
@@ -172,7 +172,7 @@ public class ReplicationSourceShipper extends Thread {
       return;
     }
     int currentSize = (int) entryBatch.getHeapSize();
-    int sizeExcludeBulkLoad = getBatchEntrySizeExcludeBulkLoad(entryBatch);
+    long sizeExcludeBulkLoad = getBatchEntrySizeExcludeBulkLoad(entryBatch);
     while (isActive()) {
       try {
         try {
@@ -290,7 +290,7 @@ public class ReplicationSourceShipper extends Thread {
   public void startup(UncaughtExceptionHandler handler) {
     String name = Thread.currentThread().getName();
     Threads.setDaemonThreadRunning(this,
-      name + ".replicationSource.shipper" + walGroupId + "," + 
source.getQueueId(), handler);
+      name + ".replicationSource.shipper " + walGroupId + "," + 
source.getQueueId(), handler);
   }
 
   WALIdentity getCurrentWALIdentity() {
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
index a0b2ecd..babeab7 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
@@ -18,7 +18,6 @@
  */
 package org.apache.hadoop.hbase.replication.regionserver;
 
-import java.io.EOFException;
 import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
@@ -27,14 +26,12 @@ import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.replication.WALEntryFilter;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Threads;
-import org.apache.hadoop.hbase.wal.FSWALIdentity;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.hadoop.hbase.wal.WALIdentity;
@@ -57,7 +54,6 @@ class ReplicationSourceWALReader extends Thread {
   private static final Logger LOG = 
LoggerFactory.getLogger(ReplicationSourceWALReader.class);
 
   private final PriorityBlockingQueue<WALIdentity> logQueue;
-  private final FileSystem fs;
   private final Configuration conf;
   private final WALEntryFilter filter;
   private final ReplicationSource source;
@@ -71,7 +67,6 @@ class ReplicationSourceWALReader extends Thread {
   private long currentPosition;
   private final long sleepForRetries;
   private final int maxRetriesMultiplier;
-  private final boolean eofAutoRecovery;
 
   //Indicates whether this particular worker is running
   private boolean isReaderRunning = true;
@@ -82,19 +77,16 @@ class ReplicationSourceWALReader extends Thread {
   /**
    * Creates a reader worker for a given WAL queue. Reads WAL entries off a 
given queue, batches the
    * entries, and puts them on a batch queue.
-   * @param fs the files system to use
    * @param conf configuration to use
    * @param logQueue The WAL queue to read off of
    * @param startPosition position in the first WAL to start reading from
    * @param filter The filter to use while reading
    * @param source replication source
    */
-  public ReplicationSourceWALReader(FileSystem fs, Configuration conf,
-      PriorityBlockingQueue<WALIdentity> logQueue, long startPosition, 
WALEntryFilter filter,
-      ReplicationSource source) {
+  public ReplicationSourceWALReader(Configuration conf, 
PriorityBlockingQueue<WALIdentity> logQueue,
+      long startPosition, WALEntryFilter filter, ReplicationSource source) {
     this.logQueue = logQueue;
     this.currentPosition = startPosition;
-    this.fs = fs;
     this.conf = conf;
     this.filter = filter;
     this.source = source;
@@ -111,7 +103,6 @@ class ReplicationSourceWALReader extends Thread {
         this.conf.getLong("replication.source.sleepforretries", 1000);    // 1 
second
     this.maxRetriesMultiplier =
         this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 
minutes @ 1 sec per
-    this.eofAutoRecovery = 
conf.getBoolean("replication.source.eof.autorecovery", false);
     this.entryBatchQueue = new LinkedBlockingQueue<>(batchCount);
     LOG.info("peerClusterZnode=" + source.getQueueId()
         + ", ReplicationSourceWALReaderThread : " + source.getPeerId()
@@ -124,10 +115,9 @@ class ReplicationSourceWALReader extends Thread {
   public void run() {
     int sleepMultiplier = 1;
     while (isReaderRunning()) { // we only loop back here if something fatal 
happened to our stream
-      try (WALEntryStream entryStream =
-          new WALEntryStream(logQueue, fs, conf, currentPosition,
-              source.getWALFileLengthProvider(), 
source.getServerWALsBelongTo(),
-              source.getSourceMetrics())) {
+      try (WALEntryStream entryStream = 
this.source.getWalProvider().getWalStream(logQueue, conf,
+        currentPosition, source.getServerWALsBelongTo(),
+        source.getSourceMetrics())) {
         while (isReaderRunning()) { // loop here to keep reusing stream while 
we can
           if (!source.isPeerEnabled()) {
             Threads.sleep(sleepForRetries);
@@ -153,9 +143,6 @@ class ReplicationSourceWALReader extends Thread {
         if (sleepMultiplier < maxRetriesMultiplier) {
           LOG.debug("Failed to read stream of replication entries: " + e);
           sleepMultiplier++;
-        } else {
-          LOG.error("Failed to read stream of replication entries", e);
-          handleEofException(e);
         }
         Threads.sleep(sleepForRetries * sleepMultiplier);
       } catch (InterruptedException e) {
@@ -242,24 +229,6 @@ class ReplicationSourceWALReader extends Thread {
     }
   }
 
-  // if we get an EOF due to a zero-length log, and there are other logs in 
queue
-  // (highly likely we've closed the current log), we've hit the max retries, 
and autorecovery is
-  // enabled, then dump the log
-  private void handleEofException(IOException e) {
-    if ((e instanceof EOFException || e.getCause() instanceof EOFException) &&
-      logQueue.size() > 1 && this.eofAutoRecovery) {
-      try {
-        if 
(fs.getFileStatus(((FSWALIdentity)logQueue.peek()).getPath()).getLen() == 0) {
-          LOG.warn("Forcing removal of 0 length log in queue: " + 
logQueue.peek());
-          logQueue.remove();
-          currentPosition = 0;
-        }
-      } catch (IOException ioe) {
-        LOG.warn("Couldn't get file length information about log " + 
logQueue.peek());
-      }
-    }
-  }
-
   public WALIdentity getCurrentWalId() {
     // if we've read some WAL entries, get the walId we read from
     WALEntryBatch batchQueueHead = entryBatchQueue.peek();
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
index c7bccb3..e13feeb 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
@@ -21,17 +21,15 @@ import java.io.IOException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.ChoreService;
 import org.apache.hadoop.hbase.CoordinatedStateManager;
 import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
@@ -75,14 +73,12 @@ public class ReplicationSyncUp extends Configured 
implements Tool {
     Configuration conf = getConf();
     try (ZKWatcher zkw =
       new ZKWatcher(conf, "syncupReplication" + System.currentTimeMillis(), 
abortable, true)) {
-      Path walRootDir = FSUtils.getWALRootDir(conf);
-      FileSystem fs = FSUtils.getWALFileSystem(conf);
-      Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
-      Path logDir = new Path(walRootDir, HConstants.HREGION_LOGDIR_NAME);
-
       System.out.println("Start Replication Server start");
+      DummyServer dummyServer = new DummyServer(zkw);
+      WALFactory factory =
+          new WALFactory(conf, dummyServer.getServerName().toString());
       Replication replication = new Replication();
-      replication.initialize(new DummyServer(zkw), fs, logDir, oldLogDir, 
null);
+      replication.initialize(dummyServer, factory.getWALProvider());
       ReplicationSourceManager manager = replication.getReplicationManager();
       manager.init().get();
       while (manager.activeFailoverTaskCount() > 0) {
@@ -99,7 +95,7 @@ public class ReplicationSyncUp extends Configured implements 
Tool {
     return 0;
   }
 
-  class DummyServer implements Server {
+  public class DummyServer implements Server {
     String hostname;
     ZKWatcher zkw;
 
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationThrottler.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationThrottler.java
index 7f73030..ba0c107 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationThrottler.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationThrottler.java
@@ -101,12 +101,12 @@ public class ReplicationThrottler {
 
   /**
    * Add current size to the current cycle's total push size
-   * @param size is the current size added to the current cycle's
+   * @param batchSize is the current size added to the current cycle's
    * total push size
    */
-  public void addPushSize(final int size) {
+  public void addPushSize(final long batchSize) {
     if (this.enabled) {
-      this.cyclePushSize += size;
+      this.cyclePushSize += batchSize;
     }
   }
 
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java
index 5f33e73..d246499 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.replication.regionserver;
 import java.io.IOException;
 import java.util.concurrent.PriorityBlockingQueue;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.replication.WALEntryFilter;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -43,10 +42,10 @@ public class SerialReplicationSourceWALReader extends 
ReplicationSourceWALReader
 
   private final SerialReplicationChecker checker;
 
-  public SerialReplicationSourceWALReader(FileSystem fs, Configuration conf,
+  public SerialReplicationSourceWALReader(Configuration conf,
       PriorityBlockingQueue<WALIdentity> logQueue, long startPosition, 
WALEntryFilter filter,
       ReplicationSource source) {
-    super(fs, conf, logQueue, startPosition, filter, source);
+    super(conf, logQueue, startPosition, filter, source);
     checker = new SerialReplicationChecker(conf, source);
   }
 
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
index 3d90153..4d89190 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
@@ -18,31 +18,12 @@
  */
 package org.apache.hadoop.hbase.replication.regionserver;
 
-import java.io.Closeable;
-import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.util.OptionalLong;
-import java.util.concurrent.PriorityBlockingQueue;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
-import org.apache.hadoop.hbase.util.CancelableProgressable;
-import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
-import org.apache.hadoop.hbase.wal.FSWALIdentity;
+import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
-import org.apache.hadoop.hbase.wal.WAL.Reader;
-import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.hadoop.hbase.wal.WALIdentity;
-import org.apache.hadoop.ipc.RemoteException;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Streaming access to WAL entries. This class is given a queue of WAL {@link 
WALIdentity}, and
@@ -51,385 +32,19 @@ import org.slf4j.LoggerFactory;
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
-class WALEntryStream implements Closeable {
-  private static final Logger LOG = 
LoggerFactory.getLogger(WALEntryStream.class);
-
-  private Reader reader;
-  private WALIdentity currentWAlIdentity;
-  // cache of next entry for hasNext()
-  private Entry currentEntry;
-  // position for the current entry. As now we support peek, which means that 
the upper layer may
-  // choose to return before reading the current entry, so it is not safe to 
return the value below
-  // in getPosition.
-  private long currentPositionOfEntry = 0;
-  // position after reading current entry
-  private long currentPositionOfReader = 0;
-  private final PriorityBlockingQueue<WALIdentity> logQueue;
-  private final FileSystem fs;
-  private final Configuration conf;
-  private final WALFileLengthProvider walFileLengthProvider;
-  // which region server the WALs belong to
-  private final ServerName serverName;
-  private final MetricsSource metrics;
-
+public interface WALEntryStream extends WAL.Reader {
   /**
-   * Create an entry stream over the given queue at the given start position
-   * @param logQueue the queue of WAL walIds
-   * @param fs {@link FileSystem} to use to create {@link Reader} for this 
stream
-   * @param conf {@link Configuration} to use to create {@link Reader} for 
this stream
-   * @param startPosition the position in the first WAL to start reading at
-   * @param serverName the server name which all WALs belong to
-   * @param metrics replication metrics
-   * @throws IOException
+   * @return the {@link WALIdentity} of the current WAL
    */
-  public WALEntryStream(PriorityBlockingQueue<WALIdentity> logQueue, 
FileSystem fs,
-      Configuration conf, long startPosition, WALFileLengthProvider 
walFileLengthProvider,
-      ServerName serverName, MetricsSource metrics) throws IOException {
-    this.logQueue = logQueue;
-    this.fs = fs;
-    this.conf = conf;
-    this.currentPositionOfEntry = startPosition;
-    this.walFileLengthProvider = walFileLengthProvider;
-    this.serverName = serverName;
-    this.metrics = metrics;
-  }
+  public WALIdentity getCurrentWalIdentity();
 
   /**
    * @return true if there is another WAL {@link Entry}
    */
-  public boolean hasNext() throws IOException {
-    if (currentEntry == null) {
-      tryAdvanceEntry();
-    }
-    return currentEntry != null;
-  }
+  public boolean hasNext() throws IOException;
 
   /**
    * Returns the next WAL entry in this stream but does not advance.
    */
-  public Entry peek() throws IOException {
-    return hasNext() ? currentEntry: null;
-  }
-
-  /**
-   * Returns the next WAL entry in this stream and advance the stream.
-   */
-  public Entry next() throws IOException {
-    Entry save = peek();
-    currentPositionOfEntry = currentPositionOfReader;
-    currentEntry = null;
-    return save;
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public void close() throws IOException {
-    closeReader();
-  }
-
-  /**
-   * @return the position of the last Entry returned by next()
-   */
-  public long getPosition() {
-    return currentPositionOfEntry;
-  }
-
-  /**
-   * @return the {@link WALIdentity} of the current WAL
-   */
-  public WALIdentity getCurrentWalIdentity() {
-    return currentWAlIdentity;
-  }
-
-  private String getCurrentWalIdStat() {
-    StringBuilder sb = new StringBuilder();
-    if (currentWAlIdentity != null) {
-      sb.append("currently replicating from: 
").append(currentWAlIdentity).append(" at position: ")
-          .append(currentPositionOfEntry).append("\n");
-    } else {
-      sb.append("no replication ongoing, waiting for new log");
-    }
-    return sb.toString();
-  }
-
-  /**
-   * Should be called if the stream is to be reused (i.e. used again after 
hasNext() has returned
-   * false)
-   */
-  public void reset() throws IOException {
-    if (reader != null && currentWAlIdentity != null) {
-      resetReader();
-    }
-  }
-
-  private void setPosition(long position) {
-    currentPositionOfEntry = position;
-  }
-
-  private void setCurrentWalId(WALIdentity walId) {
-    this.currentWAlIdentity = walId;
-  }
-
-  private void tryAdvanceEntry() throws IOException {
-    if (checkReader()) {
-      boolean beingWritten = readNextEntryAndRecordReaderPosition();
-      if (currentEntry == null && !beingWritten) {
-        // no more entries in this log file, and the file is already closed, 
i.e, rolled
-        // Before dequeueing, we should always get one more attempt at reading.
-        // This is in case more entries came in after we opened the reader, 
and the log is rolled
-        // while we were reading. See HBASE-6758
-        resetReader();
-        readNextEntryAndRecordReaderPosition();
-        if (currentEntry == null) {
-          if (checkAllBytesParsed()) { // now we're certain we're done with 
this log file
-            dequeueCurrentLog();
-            if (openNextLog()) {
-              readNextEntryAndRecordReaderPosition();
-            }
-          }
-        }
-      }
-      // if currentEntry != null then just return
-      // if currentEntry == null but the file is still being written, then we 
should not switch to
-      // the next log either, just return here and try next time to see if 
there are more entries in
-      // the current file
-    }
-    // do nothing if we don't have a WAL Reader (e.g. if there's no logs in 
queue)
-  }
-
-  // HBASE-15984 check to see we have in fact parsed all data in a cleanly 
closed file
-  private boolean checkAllBytesParsed() throws IOException {
-    // -1 means the wal wasn't closed cleanly.
-    final long trailerSize = currentTrailerSize();
-    FileStatus stat = null;
-    try {
-      stat = 
fs.getFileStatus(((FSWALIdentity)this.currentWAlIdentity).getPath());
-    } catch (IOException exception) {
-      LOG.warn("Couldn't get file length information about log {}, it {} 
closed cleanly {}",
-        currentWAlIdentity, trailerSize < 0 ? "was not" : "was", 
getCurrentWalIdStat());
-      metrics.incrUnknownFileLengthForClosedWAL();
-    }
-    // Here we use currentPositionOfReader instead of currentPositionOfEntry.
-    // We only call this method when currentEntry is null so usually they are 
the same, but there
-    // are two exceptions. One is we have nothing in the file but only a 
header, in this way
-    // the currentPositionOfEntry will always be 0 since we have no change to 
update it. The other
-    // is that we reach the end of file, then currentPositionOfEntry will 
point to the tail of the
-    // last valid entry, and the currentPositionOfReader will usually point to 
the end of the file.
-    if (stat != null) {
-      if (trailerSize < 0) {
-        if (currentPositionOfReader < stat.getLen()) {
-          final long skippedBytes = stat.getLen() - currentPositionOfReader;
-          LOG.debug(
-            "Reached the end of WAL file '{}'. It was not closed cleanly," +
-              " so we did not parse {} bytes of data. This is normally ok.",
-            currentWAlIdentity, skippedBytes);
-          metrics.incrUncleanlyClosedWALs();
-          metrics.incrBytesSkippedInUncleanlyClosedWALs(skippedBytes);
-        }
-      } else if (currentPositionOfReader + trailerSize < stat.getLen()) {
-        LOG.warn(
-          "Processing end of WAL file '{}'. At position {}, which is too far 
away from" +
-            " reported file length {}. Restarting WAL reading (see HBASE-15983 
for details). {}",
-          currentWAlIdentity, currentPositionOfReader, stat.getLen(), 
getCurrentWalIdStat());
-        setPosition(0);
-        resetReader();
-        metrics.incrRestartedWALReading();
-        metrics.incrRepeatedFileBytes(currentPositionOfReader);
-        return false;
-      }
-    }
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("Reached the end of log " + this.currentWAlIdentity
-          + ", and the length of the file is " + (stat == null ? "N/A" : 
stat.getLen()));
-    }
-    metrics.incrCompletedWAL();
-    return true;
-  }
-
-  private void dequeueCurrentLog() throws IOException {
-    LOG.debug("Reached the end of log {}", currentWAlIdentity);
-    closeReader();
-    logQueue.remove();
-    setPosition(0);
-    metrics.decrSizeOfLogQueue();
-  }
-
-  /**
-   * Returns whether the file is opened for writing.
-   */
-  private boolean readNextEntryAndRecordReaderPosition() throws IOException {
-    Entry readEntry = reader.next();
-    long readerPos = reader.getPosition();
-    OptionalLong fileLength =
-        walFileLengthProvider.getLogFileSizeIfBeingWritten(currentWAlIdentity);
-    if (fileLength.isPresent() && readerPos > fileLength.getAsLong()) {
-      // see HBASE-14004, for AsyncFSWAL which uses fan-out, it is possible 
that we read uncommitted
-      // data, so we need to make sure that we do not read beyond the 
committed file length.
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("The provider tells us the valid length for " + 
currentWAlIdentity + " is " +
-            fileLength.getAsLong() + ", but we have advanced to " + readerPos);
-      }
-      resetReader();
-      return true;
-    }
-    if (readEntry != null) {
-      metrics.incrLogEditsRead();
-      metrics.incrLogReadInBytes(readerPos - currentPositionOfEntry);
-    }
-    currentEntry = readEntry; // could be null
-    this.currentPositionOfReader = readerPos;
-    return fileLength.isPresent();
-  }
-
-  private void closeReader() throws IOException {
-    if (reader != null) {
-      reader.close();
-      reader = null;
-    }
-  }
-
-  // if we don't have a reader, open a reader on the next log
-  private boolean checkReader() throws IOException {
-    if (reader == null) {
-      return openNextLog();
-    }
-    return true;
-  }
-
-  // open a reader on the next log in queue
-  private boolean openNextLog() throws IOException {
-    WALIdentity nextWalId = logQueue.peek();
-    if (nextWalId != null) {
-      openReader((FSWALIdentity)nextWalId);
-      if (reader != null) {
-        return true;
-      }
-    } else {
-      // no more files in queue, this could happen for recovered queue, or for 
a wal group of a sync
-      // replication peer which has already been transited to DA or S.
-      setCurrentWalId(null);
-    }
-    return false;
-  }
-
-  private Path getArchivedLog(Path path) throws IOException {
-    Path rootDir = FSUtils.getRootDir(conf);
-
-    // Try found the log in old dir
-    Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
-    Path archivedLogLocation = new Path(oldLogDir, path.getName());
-    if (fs.exists(archivedLogLocation)) {
-      LOG.info("Log " + path + " was moved to " + archivedLogLocation);
-      return archivedLogLocation;
-    }
-
-    // Try found the log in the seperate old log dir
-    oldLogDir =
-        new Path(rootDir, new StringBuilder(HConstants.HREGION_OLDLOGDIR_NAME)
-            
.append(Path.SEPARATOR).append(serverName.getServerName()).toString());
-    archivedLogLocation = new Path(oldLogDir, path.getName());
-    if (fs.exists(archivedLogLocation)) {
-      LOG.info("Log " + path + " was moved to " + archivedLogLocation);
-      return archivedLogLocation;
-    }
-
-    LOG.error("Couldn't locate log: " + path);
-    return path;
-  }
-
-  private void handleFileNotFound(FSWALIdentity walId, FileNotFoundException 
fnfe)
-      throws IOException {
-    // If the log was archived, continue reading from there
-    FSWALIdentity archivedLog = new 
FSWALIdentity(getArchivedLog(walId.getPath()));
-    if (!walId.equals(archivedLog)) {
-      openReader(archivedLog);
-    } else {
-      throw fnfe;
-    }
-  }
-
-  private void openReader(FSWALIdentity walId) throws IOException {
-    try {
-      // Detect if this is a new file, if so get a new reader else
-      // reset the current reader so that we see the new data
-      if (reader == null || !getCurrentWalIdentity().equals(walId)) {
-        closeReader();
-        reader = WALFactory.createReader(fs, walId.getPath(), conf);
-        seek();
-        setCurrentWalId(walId);
-      } else {
-        resetReader();
-      }
-    } catch (FileNotFoundException fnfe) {
-      handleFileNotFound(walId, fnfe);
-    }  catch (RemoteException re) {
-      IOException ioe = re.unwrapRemoteException(FileNotFoundException.class);
-      if (!(ioe instanceof FileNotFoundException)) throw ioe;
-      handleFileNotFound(walId, (FileNotFoundException)ioe);
-    } catch (LeaseNotRecoveredException lnre) {
-      // HBASE-15019 the WAL was not closed due to some hiccup.
-      LOG.warn("Try to recover the WAL lease " + currentWAlIdentity, lnre);
-      recoverLease(conf, ((FSWALIdentity)currentWAlIdentity).getPath());
-      reader = null;
-    } catch (NullPointerException npe) {
-      // Workaround for race condition in HDFS-4380
-      // which throws a NPE if we open a file before any data node has the 
most recent block
-      // Just sleep and retry. Will require re-reading compressed WALs for 
compressionContext.
-      LOG.warn("Got NPE opening reader, will retry.");
-      reader = null;
-    }
-  }
-
-  // For HBASE-15019
-  private void recoverLease(final Configuration conf, final Path path) {
-    try {
-      final FileSystem dfs = FSUtils.getCurrentFileSystem(conf);
-      FSUtils fsUtils = FSUtils.getInstance(dfs, conf);
-      fsUtils.recoverFileLease(dfs, path, conf, new CancelableProgressable() {
-        @Override
-        public boolean progress() {
-          LOG.debug("recover WAL lease: " + path);
-          return true;
-        }
-      });
-    } catch (IOException e) {
-      LOG.warn("unable to recover lease for WAL: " + path, e);
-    }
-  }
-
-  private void resetReader() throws IOException {
-    try {
-      currentEntry = null;
-      reader.reset();
-      seek();
-    } catch (FileNotFoundException fnfe) {
-      // If the log was archived, continue reading from there
-      FSWALIdentity archivedLog =
-          new FSWALIdentity(getArchivedLog(((FSWALIdentity) 
currentWAlIdentity).getPath()));
-      if (!currentWAlIdentity.equals(archivedLog)) {
-        openReader(archivedLog);
-      } else {
-        throw fnfe;
-      }
-    } catch (NullPointerException npe) {
-      throw new IOException("NPE resetting reader, likely HDFS-4380", npe);
-    }
-  }
-
-  private void seek() throws IOException {
-    if (currentPositionOfEntry != 0) {
-      reader.seek(currentPositionOfEntry);
-    }
-  }
-
-  private long currentTrailerSize() {
-    long size = -1L;
-    if (reader instanceof ProtobufLogReader) {
-      final ProtobufLogReader pblr = (ProtobufLogReader) reader;
-      size = pblr.trailerSize();
-    }
-    return size;
-  }
+  public Entry peek() throws IOException;
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALFileLengthProvider.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALFileLengthProvider.java
index d0b63cc..08bff9b 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALFileLengthProvider.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALFileLengthProvider.java
@@ -18,8 +18,7 @@
 package org.apache.hadoop.hbase.replication.regionserver;
 
 import java.util.OptionalLong;
-
-import org.apache.hadoop.hbase.wal.WALIdentity;
+import org.apache.hadoop.fs.Path;
 import org.apache.yetus.audience.InterfaceAudience;
 
 /**
@@ -30,5 +29,5 @@ import org.apache.yetus.audience.InterfaceAudience;
 @FunctionalInterface
 public interface WALFileLengthProvider {
 
-  OptionalLong getLogFileSizeIfBeingWritten(WALIdentity walId);
+  OptionalLong getLogFileSizeIfBeingWritten(Path walId);
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
index 1f24548..e5303b0 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
@@ -23,28 +23,36 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
+import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.yetus.audience.InterfaceStability;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.replication.regionserver.FSWALEntryStream;
+import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp;
+import org.apache.hadoop.hbase.replication.regionserver.WALEntryStream;
 import org.apache.hadoop.hbase.util.CancelableProgressable;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import 
org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
 
@@ -62,6 +70,8 @@ import 
org.apache.hbase.thirdparty.com.google.common.collect.Lists;
 @InterfaceStability.Evolving
 public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> 
implements WALProvider {
 
+  // Path to the wals directories
+  // Path to the wal archive
   private static final Logger LOG = 
LoggerFactory.getLogger(AbstractFSWALProvider.class);
 
   /** Separate old log into different dir by regionserver name **/
@@ -94,6 +104,12 @@ public abstract class AbstractFSWALProvider<T extends 
AbstractFSWAL<?>> implemen
    */
   private final ReadWriteLock walCreateLock = new ReentrantReadWriteLock();
 
+  private Path rootDir;
+
+  private Path oldLogDir;
+
+  private FileSystem fs;
+
   /**
    * @param factory factory that made us, identity used for FS layout. may not 
be null
    * @param conf may not be null
@@ -118,6 +134,9 @@ public abstract class AbstractFSWALProvider<T extends 
AbstractFSWAL<?>> implemen
       }
     }
     logPrefix = sb.toString();
+    rootDir = FSUtils.getRootDir(conf);
+    oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
+    this.fs = CommonFSUtils.getWALFileSystem(conf);
     doInit(conf);
   }
 
@@ -554,4 +573,87 @@ public abstract class AbstractFSWALProvider<T extends 
AbstractFSWAL<?>> implemen
   public static long getWALStartTimeFromWALName(String name) {
     return Long.parseLong(getWALNameGroupFromWALName(name, 2));
   }
+
+  @Override
+  public WALEntryStream getWalStream(PriorityBlockingQueue<WALIdentity> 
logQueue,
+      Configuration conf, long startPosition, ServerName serverName, 
MetricsSource metrics)
+      throws IOException {
+    return new FSWALEntryStream(fs, logQueue, conf, startPosition,
+        serverName, metrics, this);
+  }
+
+  @Override
+  public WALIdentity createWalIdentity(ServerName serverName, String walName, 
boolean isArchive) {
+    Path walPath;
+    if (isArchive) {
+      walPath = new Path(oldLogDir, walName);
+    } else {
+      Path logDir =
+          new Path(rootDir, 
AbstractFSWALProvider.getWALDirectoryName(serverName.toString()));
+      walPath = new Path(logDir, walName);
+    }
+    return new FSWALIdentity(walPath);
+  }
+
+  @Override
+  public WALIdentity locateWalId(WALIdentity walId, Server server,
+      List<ServerName> deadRegionServers) throws IOException {
+    FSWALIdentity fsWALId = ((FSWALIdentity) walId);
+    if (fs.exists(fsWALId.getPath())) {
+      // still in same location, don't need to
+      // do anything
+      return fsWALId;
+    }
+    // Path changed - try to find the right path.
+    if (server instanceof ReplicationSyncUp.DummyServer) {
+      // In the case of disaster/recovery, HMaster may be shutdown/crashed 
before flush data
+      // from .logs to .oldlogs. Loop into .logs folders and check whether a 
match exists
+      Path newPath = getReplSyncUpPath(server.getServerName(), 
((FSWALIdentity) fsWALId).getPath());
+      return new FSWALIdentity(newPath);
+    } else {
+      // See if Path exists in the dead RS folder (there could be a chain of 
failures
+      // to look at)
+      LOG.info("NB dead servers : " + deadRegionServers.size());
+
+      for (ServerName curDeadServerName : deadRegionServers) {
+        final Path deadRsDirectory = new Path(rootDir,
+            
AbstractFSWALProvider.getWALDirectoryName(curDeadServerName.getServerName()));
+        Path[] locs = new Path[] { new Path(deadRsDirectory, 
fsWALId.getName()), new Path(
+            deadRsDirectory.suffix(AbstractFSWALProvider.SPLITTING_EXT), 
fsWALId.getName()) };
+        for (Path possibleLogLocation : locs) {
+          LOG.info("Possible location " + 
possibleLogLocation.toUri().toString());
+          if (fs.exists(possibleLogLocation)) {
+            // We found the right new location
+            LOG.info("Log " + fsWALId + " still exists at " + 
possibleLogLocation);
+            return new FSWALIdentity(possibleLogLocation);
+          }
+        }
+      }
+      // didn't find a new location
+      LOG.error(
+        String.format("WAL Path %s doesn't exist and couldn't find its new 
location", fsWALId));
+      return fsWALId;
+    }
+  }
+
+  // N.B. the ReplicationSyncUp tool sets the manager.getWALDir to the root of 
the wal
+  // area rather than to the wal area for a particular region server.
+  private Path getReplSyncUpPath(ServerName server, Path path) throws 
IOException {
+    Path logDir = new Path(rootDir,
+        
AbstractFSWALProvider.getWALDirectoryName(server.getServerName().toString()));
+    FileStatus[] rss = fs.listStatus(logDir);
+    for (FileStatus rs : rss) {
+      Path p = rs.getPath();
+      FileStatus[] logs = fs.listStatus(p);
+      for (FileStatus log : logs) {
+        p = new Path(p, log.getPath().getName());
+        if (p.getName().equals(path.getName())) {
+          LOG.info("Log " + p.getName() + " found at " + p);
+          return p;
+        }
+      }
+    }
+    LOG.error("Didn't find path for: " + path.getName());
+    return path;
+  }
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
index 8822f29..1099b07 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
@@ -24,16 +24,22 @@ import java.util.Map;
 import java.util.OptionalLong;
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
-
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.PrivateCellUtil;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import 
org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl.WriteEntry;
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
 import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
+import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
+import org.apache.hadoop.hbase.replication.regionserver.WALEntryStream;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -267,7 +273,7 @@ class DisabledWALProvider implements WALProvider {
     }
 
     @Override
-    public OptionalLong getLogFileSizeIfBeingWritten(WALIdentity path) {
+    public OptionalLong getLogFileSizeIfBeingWritten(Path path) {
       return OptionalLong.empty();
     }
   }
@@ -286,4 +292,87 @@ class DisabledWALProvider implements WALProvider {
   public void addWALActionsListener(WALActionsListener listener) {
     disabled.registerWALActionsListener(listener);
   }
+
+  @Override
+  public WALEntryStream getWalStream(PriorityBlockingQueue<WALIdentity> 
logQueue,
+      Configuration conf, long startPosition, ServerName serverName, 
MetricsSource metrics)
+      throws IOException {
+    return new WALEntryStream() {
+
+      @Override
+      public void close() throws IOException {
+      }
+
+      @Override
+      public void seek(long pos) throws IOException {
+      }
+
+      @Override
+      public void reset() throws IOException {
+      }
+
+      @Override
+      public Entry next(Entry reuse) throws IOException {
+        return null;
+      }
+
+      @Override
+      public Entry next() throws IOException {
+        return null;
+      }
+
+      @Override
+      public long getPosition() throws IOException {
+        return 0;
+      }
+
+      @Override
+      public Entry peek() throws IOException {
+        return null;
+      }
+
+      @Override
+      public boolean hasNext() throws IOException {
+        return false;
+      }
+
+      @Override
+      public WALIdentity getCurrentWalIdentity() {
+        return null;
+      }
+    };
+  }
+
+  @Override
+  public WALIdentity createWalIdentity(ServerName serverName, String walName, 
boolean isArchive) {
+    return new WALIdentity() {
+
+      @Override
+      public int compareTo(WALIdentity o) {
+        return 0;
+      }
+
+      @Override
+      public String getName() {
+        return walName;
+      }
+
+      @Override
+      public boolean equals(Object obj) {
+        return obj instanceof WALIdentity;
+      }
+
+      @Override
+      public int hashCode() {
+        return 0;
+      }
+    };
+  }
+
+  @Override
+  public WALIdentity locateWalId(WALIdentity wal, Server server, 
List<ServerName> deadRegionServers)
+      throws IOException {
+    return wal;
+  }
+
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java
index 0b7b8da..8cd667a 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java
@@ -26,15 +26,22 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.locks.Lock;
 import java.util.stream.Collectors;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
 // imports for classes still in regionserver.wal
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.replication.regionserver.FSWALEntryStream;
+import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
+import org.apache.hadoop.hbase.replication.regionserver.WALEntryStream;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
 import org.apache.hadoop.hbase.util.KeyLocker;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
@@ -88,6 +95,7 @@ public class RegionGroupingProvider implements WALProvider {
     }
   }
 
+
   /**
    * instantiate a strategy from a config property.
    * requires conf to have already been set (as well as anything the provider 
might need to read).
@@ -137,6 +145,7 @@ public class RegionGroupingProvider implements WALProvider {
   private List<WALActionsListener> listeners = new ArrayList<>();
   private String providerId;
   private Class<? extends WALProvider> providerClass;
+  private WALProvider delegateProvider;
 
   @Override
   public void init(WALFactory factory, Configuration conf, String providerId) 
throws IOException {
@@ -156,6 +165,8 @@ public class RegionGroupingProvider implements WALProvider {
     this.providerId = sb.toString();
     this.strategy = getStrategy(conf, REGION_GROUPING_STRATEGY, 
DEFAULT_REGION_GROUPING_STRATEGY);
     this.providerClass = factory.getProviderClass(DELEGATE_PROVIDER, 
DEFAULT_DELEGATE_PROVIDER);
+    delegateProvider = WALFactory.createProvider(providerClass);
+    delegateProvider.init(factory, conf, providerId);
   }
 
   private WALProvider createProvider(String group) throws IOException {
@@ -285,4 +296,24 @@ public class RegionGroupingProvider implements WALProvider 
{
     // extra code actually works, then we will have other big problems. So 
leave it as is.
     listeners.add(listener);
   }
+
+  @Override
+  public WALEntryStream getWalStream(PriorityBlockingQueue<WALIdentity> 
logQueue,
+      Configuration conf, long startPosition, ServerName serverName, 
MetricsSource metrics)
+      throws IOException {
+    return new FSWALEntryStream(CommonFSUtils.getWALFileSystem(conf), 
logQueue, conf, startPosition,
+      serverName, metrics, this);
+  }
+
+  @Override
+  public WALIdentity createWalIdentity(ServerName serverName, String walName, 
boolean isArchive) {
+    return delegateProvider.createWalIdentity(serverName, walName, isArchive);
+  }
+
+  @Override
+  public WALIdentity locateWalId(WALIdentity wal, Server server, 
List<ServerName> deadRegionServers)
+      throws IOException {
+    return delegateProvider.locateWalId(wal, server, deadRegionServers);
+  }
+
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java
index 9859c20..4d0df91 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java
@@ -28,6 +28,7 @@ import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.Lock;
 import java.util.function.BiPredicate;
@@ -36,14 +37,19 @@ import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.regionserver.wal.DualAsyncFSWAL;
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
 import org.apache.hadoop.hbase.replication.ReplicationUtils;
 import org.apache.hadoop.hbase.replication.SyncReplicationState;
+import org.apache.hadoop.hbase.replication.regionserver.FSWALEntryStream;
+import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
 import org.apache.hadoop.hbase.replication.regionserver.PeerActionListener;
 import 
org.apache.hadoop.hbase.replication.regionserver.SyncReplicationPeerInfoProvider;
+import org.apache.hadoop.hbase.replication.regionserver.WALEntryStream;
 import org.apache.hadoop.hbase.util.CommonFSUtils;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.KeyLocker;
@@ -349,4 +355,23 @@ public class SyncReplicationWALProvider implements 
WALProvider, PeerActionListen
     return provider;
   }
 
+  @Override
+  public WALEntryStream getWalStream(PriorityBlockingQueue<WALIdentity> 
logQueue,
+      Configuration conf, long startPosition, ServerName serverName, 
MetricsSource metrics)
+      throws IOException {
+    return new FSWALEntryStream(CommonFSUtils.getWALFileSystem(conf), 
logQueue, conf, startPosition,
+        serverName, metrics, this);
+  }
+
+  @Override
+  public WALIdentity createWalIdentity(ServerName serverName, String walName, 
boolean isArchive) {
+    return provider.createWalIdentity(serverName, walName, isArchive);
+  }
+
+  @Override
+  public WALIdentity locateWalId(WALIdentity wal, Server server, 
List<ServerName> deadRegionServers)
+      throws IOException {
+    return provider.locateWalId(wal, server, deadRegionServers);
+  }
+
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java
index 244a636..3ef4efa 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java
@@ -21,12 +21,15 @@ package org.apache.hadoop.hbase.wal;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.List;
-import java.util.OptionalLong;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.PriorityBlockingQueue;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
-import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider;
+import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
+import org.apache.hadoop.hbase.replication.regionserver.WALEntryStream;
 import org.apache.yetus.audience.InterfaceAudience;
 
 /**
@@ -109,8 +112,37 @@ public interface WALProvider {
    */
   void addWALActionsListener(WALActionsListener listener);
 
-  default WALFileLengthProvider getWALFileLengthProvider() {
-    return path -> getWALs().stream().map(w -> 
w.getLogFileSizeIfBeingWritten(path))
-        .filter(o -> o.isPresent()).findAny().orElse(OptionalLong.empty());
-  }
+  /**
+   * Streaming implementation to retrieve WAL entries from given set of Wals. 
This class is given a
+   * queue of WAL
+   * @param logQueue Queue of wals
+   * @param conf configuration
+   * @param startPosition start position for the first wal in the queue
+   * @param serverName name of the server
+   * @param metrics metric source
+   * @return WALEntryStream
+   * @throws IOException IOException
+   */
+  WALEntryStream getWalStream(PriorityBlockingQueue<WALIdentity> logQueue, 
Configuration conf,
+      long startPosition, ServerName serverName, MetricsSource metrics) throws 
IOException;
+
+  /**
+   * Create wal identity wrapper for wal Name
+   * @param serverName regionserver name
+   * @param walName Name of the wal
+   * @param isArchive where it is archived
+   * @return WALIdentity
+   */
+  WALIdentity createWalIdentity(ServerName serverName, String walName, boolean 
isArchive);
+
+  /**
+   * Get the updated walId if it is moved after the server is dead
+   * @param wal original walId
+   * @param server server which is dead
+   * @param deadRegionServers list of dead region servers
+   * @return updated walId after relocation
+   * @throws IOException IOException
+   */
+  WALIdentity locateWalId(WALIdentity wal, Server server,
+      List<ServerName> deadRegionServers) throws IOException;
 }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
index ed71e6e..ec0ed40 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
@@ -21,9 +21,7 @@ import java.io.IOException;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
-
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
@@ -31,10 +29,10 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
 import 
org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
 import 
org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
-import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.hbase.wal.WALIdentity;
+import org.apache.hadoop.hbase.wal.WALProvider;
 
 /**
  * Source that does nothing at all, helpful to test ReplicationSourceManager
@@ -46,18 +44,15 @@ public class ReplicationSourceDummy implements 
ReplicationSourceInterface {
   private String peerClusterId;
   private WALIdentity currentWalId;
   private MetricsSource metrics;
-  private WALFileLengthProvider walFileLengthProvider;
   private AtomicBoolean startup = new AtomicBoolean(false);
 
   @Override
-  public void init(Configuration conf, FileSystem fs, ReplicationSourceManager 
manager,
-      ReplicationQueueStorage rq, ReplicationPeer rp, Server server, String 
peerClusterId,
-      UUID clusterId, WALFileLengthProvider walFileLengthProvider, 
MetricsSource metrics)
-      throws IOException {
+  public void init(Configuration conf, ReplicationSourceManager manager, 
ReplicationQueueStorage rq,
+      ReplicationPeer rp, Server server, String peerClusterId, UUID clusterId,
+      MetricsSource metrics, WALProvider walProvider) throws IOException {
     this.manager = manager;
     this.peerClusterId = peerClusterId;
     this.metrics = metrics;
-    this.walFileLengthProvider = walFileLengthProvider;
     this.replicationPeer = rp;
   }
 
@@ -144,12 +139,7 @@ public class ReplicationSourceDummy implements 
ReplicationSourceInterface {
   }
 
   @Override
-  public void postShipEdits(List<Entry> entries, int batchSize) {
-  }
-
-  @Override
-  public WALFileLengthProvider getWALFileLengthProvider() {
-    return walFileLengthProvider;
+  public void postShipEdits(List<Entry> entries, long batchSize) {
   }
 
   @Override
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
index 2320205..6322f79 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
@@ -22,7 +22,6 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 
 import java.io.IOException;
-import java.util.OptionalLong;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -48,12 +47,6 @@ import 
org.apache.hadoop.hbase.replication.ReplicationEndpoint;
 import org.apache.hadoop.hbase.replication.ReplicationPeer;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
-import 
org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
-import 
org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSource;
-import 
org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSourceShipper;
-import org.apache.hadoop.hbase.replication.regionserver.Replication;
-import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource;
-import 
org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -176,8 +169,7 @@ public class TestReplicationSource {
     testConf.setInt("replication.source.maxretriesmultiplier", 1);
     ReplicationSourceManager manager = 
Mockito.mock(ReplicationSourceManager.class);
     Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
-    source.init(testConf, null, manager, null, mockPeer, null, "testPeer", 
null,
-      p -> OptionalLong.empty(), null);
+    source.init(testConf, manager, null, mockPeer, null, "testPeer", null, 
null, null);
     ExecutorService executor = Executors.newSingleThreadExecutor();
     Future<?> future = executor.submit(new Runnable() {
 
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index def737e..f816ea6 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -77,6 +77,7 @@ import org.apache.hadoop.hbase.replication.ReplicationUtils;
 import org.apache.hadoop.hbase.replication.SyncReplicationState;
 import org.apache.hadoop.hbase.replication.ZKReplicationPeerStorage;
 import 
org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker;
+import 
org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp.DummyServer;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -90,6 +91,7 @@ import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.hadoop.hbase.wal.WALIdentity;
 import org.apache.hadoop.hbase.wal.WALKeyImpl;
+import org.apache.hadoop.hbase.wal.WALProvider;
 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
@@ -196,7 +198,10 @@ public abstract class TestReplicationSourceManager {
     logDir = utility.getDataTestDir(HConstants.HREGION_LOGDIR_NAME);
     remoteLogDir = 
utility.getDataTestDir(ReplicationUtils.REMOTE_WAL_DIR_NAME);
     replication = new Replication();
-    replication.initialize(new DummyServer(), fs, logDir, oldLogDir, null);
+    DummyServer dummyServer = new DummyServer();
+    WALFactory factory =
+        new WALFactory(conf, dummyServer.getServerName().toString());
+    replication.initialize(dummyServer, factory.getWALProvider());
     managerOfCluster = getManagerFromCluster();
     if (managerOfCluster != null) {
       // After replication procedure, we need to add peer by hand (other than 
by receiving
@@ -822,10 +827,9 @@ public abstract class TestReplicationSourceManager {
   static class FailInitializeDummyReplicationSource extends 
ReplicationSourceDummy {
 
     @Override
-    public void init(Configuration conf, FileSystem fs, 
ReplicationSourceManager manager,
+    public void init(Configuration conf, ReplicationSourceManager manager,
         ReplicationQueueStorage rq, ReplicationPeer rp, Server server, String 
peerClusterId,
-        UUID clusterId, WALFileLengthProvider walFileLengthProvider, 
MetricsSource metrics)
-        throws IOException {
+        UUID clusterId, MetricsSource metrics, WALProvider provider) throws 
IOException {
       throw new IOException("Failing deliberately");
     }
   }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationThrottler.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationThrottler.java
index c4d529e..6e38016 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationThrottler.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationThrottler.java
@@ -58,8 +58,8 @@ public class TestReplicationThrottler {
     assertEquals(0, ticks1);
     assertEquals(0, ticks2);
 
-    throttler1.addPushSize(1000);
-    throttler2.addPushSize(1000);
+    throttler1.addPushSize(1000L);
+    throttler2.addPushSize(1000L);
 
     ticks1 = throttler1.getNextSleepInterval(5);
     ticks2 = throttler2.getNextSleepInterval(5);
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
index d22f96a..4f2f163 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
@@ -62,6 +62,7 @@ import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.hadoop.hbase.wal.WALIdentity;
 import org.apache.hadoop.hbase.wal.WALKeyImpl;
+import org.apache.hadoop.hbase.wal.WALProvider;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -106,6 +107,8 @@ public class TestWALEntryStream {
   public TestName tn = new TestName();
   private final MultiVersionConcurrencyControl mvcc = new 
MultiVersionConcurrencyControl();
 
+  private WALProvider walProvider;
+
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
     TEST_UTIL = new HBaseTestingUtility();
@@ -125,8 +128,9 @@ public class TestWALEntryStream {
   public void setUp() throws Exception {
     walQueue = new PriorityBlockingQueue<>();
     pathWatcher = new PathWatcher();
-    final WALFactory wals = new WALFactory(CONF, tn.getMethodName());
-    wals.getWALProvider().addWALActionsListener(pathWatcher);
+    WALFactory wals = new WALFactory(CONF, tn.getMethodName());
+    walProvider = wals.getWALProvider();
+    walProvider.addWALActionsListener(pathWatcher);
     log = wals.getWAL(info);
   }
 
@@ -156,8 +160,8 @@ public class TestWALEntryStream {
 
           log.rollWriter();
 
-          try (WALEntryStream entryStream =
-              new WALEntryStream(walQueue, fs, CONF, 0, log, null, new 
MetricsSource("1"))) {
+          try (WALEntryStream entryStream = new FSWALEntryStream(fs, walQueue, 
CONF, 0, null,
+              new MetricsSource("1"), this.walProvider)) {
             int i = 0;
             while (entryStream.hasNext()) {
               assertNotNull(entryStream.next());
@@ -184,7 +188,7 @@ public class TestWALEntryStream {
     appendToLogAndSync();
     long oldPos;
     try (WALEntryStream entryStream =
-        new WALEntryStream(walQueue, fs, CONF, 0, log, null, new 
MetricsSource("1"))) {
+        new FSWALEntryStream(fs, walQueue, CONF, 0, null, new 
MetricsSource("1"), walProvider)) {
       // There's one edit in the log, read it. Reading past it needs to throw 
exception
       assertTrue(entryStream.hasNext());
       WAL.Entry entry = entryStream.peek();
@@ -198,8 +202,8 @@ public class TestWALEntryStream {
 
     appendToLogAndSync();
 
-    try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, CONF, 
oldPos,
-        log, null, new MetricsSource("1"))) {
+    try (WALEntryStream entryStream = new FSWALEntryStream(fs, walQueue, CONF, 
oldPos, null,
+        new MetricsSource("1"), walProvider)) {
       // Read the newly added entry, make sure we made progress
       WAL.Entry entry = entryStream.next();
       assertNotEquals(oldPos, entryStream.getPosition());
@@ -212,8 +216,8 @@ public class TestWALEntryStream {
     log.rollWriter();
     appendToLogAndSync();
 
-    try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, CONF, 
oldPos,
-        log, null, new MetricsSource("1"))) {
+    try (WALEntryStream entryStream = new FSWALEntryStream(fs, walQueue, CONF, 
oldPos, null,
+        new MetricsSource("1"), walProvider)) {
       WAL.Entry entry = entryStream.next();
       assertNotEquals(oldPos, entryStream.getPosition());
       assertNotNull(entry);
@@ -238,7 +242,7 @@ public class TestWALEntryStream {
     appendToLog("1");
     appendToLog("2");// 2
     try (WALEntryStream entryStream =
-        new WALEntryStream(walQueue, fs, CONF, 0, log, null, new 
MetricsSource("1"))) {
+        new FSWALEntryStream(fs, walQueue, CONF, 0, null, new 
MetricsSource("1"), walProvider)) {
       assertEquals("1", getRow(entryStream.next()));
 
       appendToLog("3"); // 3 - comes in after reader opened
@@ -263,7 +267,7 @@ public class TestWALEntryStream {
   public void testNewEntriesWhileStreaming() throws Exception {
     appendToLog("1");
     try (WALEntryStream entryStream =
-        new WALEntryStream(walQueue, fs, CONF, 0, log, null, new 
MetricsSource("1"))) {
+        new FSWALEntryStream(fs, walQueue, CONF, 0, null, new 
MetricsSource("1"), walProvider)) {
       entryStream.next(); // we've hit the end of the stream at this point
 
       // some new entries come in while we're streaming
@@ -286,15 +290,15 @@ public class TestWALEntryStream {
     long lastPosition = 0;
     appendToLog("1");
     try (WALEntryStream entryStream =
-        new WALEntryStream(walQueue, fs, CONF, 0, log, null, new 
MetricsSource("1"))) {
+        new FSWALEntryStream(fs, walQueue, CONF, 0, null, new 
MetricsSource("1"), walProvider)) {
       entryStream.next(); // we've hit the end of the stream at this point
       appendToLog("2");
       appendToLog("3");
       lastPosition = entryStream.getPosition();
     }
     // next stream should picks up where we left off
-    try (WALEntryStream entryStream =
-        new WALEntryStream(walQueue, fs, CONF, lastPosition, log, null, new 
MetricsSource("1"))) {
+    try (WALEntryStream entryStream = new FSWALEntryStream(fs, walQueue, CONF, 
lastPosition, null,
+        new MetricsSource("1"), walProvider)) {
       assertEquals("2", getRow(entryStream.next()));
       assertEquals("3", getRow(entryStream.next()));
       assertFalse(entryStream.hasNext()); // done
@@ -311,14 +315,14 @@ public class TestWALEntryStream {
     long lastPosition = 0;
     appendEntriesToLogAndSync(3);
     // read only one element
-    try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, CONF, 
lastPosition,
-        log, null, new MetricsSource("1"))) {
+    try (WALEntryStream entryStream = new FSWALEntryStream(fs, walQueue, CONF, 
lastPosition, null,
+        new MetricsSource("1"), walProvider)) {
       entryStream.next();
       lastPosition = entryStream.getPosition();
     }
     // there should still be two more entries from where we left off
-    try (WALEntryStream entryStream =
-        new WALEntryStream(walQueue, fs, CONF, lastPosition, log, null, new 
MetricsSource("1"))) {
+    try (WALEntryStream entryStream = new FSWALEntryStream(fs, walQueue, CONF, 
lastPosition, null,
+        new MetricsSource("1"), walProvider)) {
       assertNotNull(entryStream.next());
       assertNotNull(entryStream.next());
       assertFalse(entryStream.hasNext());
@@ -329,7 +333,7 @@ public class TestWALEntryStream {
   @Test
   public void testEmptyStream() throws Exception {
     try (WALEntryStream entryStream =
-        new WALEntryStream(walQueue, fs, CONF, 0, log, null, new 
MetricsSource("1"))) {
+        new FSWALEntryStream(fs, walQueue, CONF, 0, null, new 
MetricsSource("1"), walProvider)) {
       assertFalse(entryStream.hasNext());
     }
   }
@@ -341,9 +345,9 @@ public class TestWALEntryStream {
     ReplicationSource source = Mockito.mock(ReplicationSource.class);
     when(source.getSourceManager()).thenReturn(mockSourceManager);
     when(source.getSourceMetrics()).thenReturn(new MetricsSource("1"));
-    when(source.getWALFileLengthProvider()).thenReturn(log);
     when(source.getServer()).thenReturn(mockServer);
     when(source.isRecovered()).thenReturn(recovered);
+    when(source.getWalProvider()).thenReturn(this.walProvider);
     return source;
   }
 
@@ -351,7 +355,7 @@ public class TestWALEntryStream {
     ReplicationSource source = mockReplicationSource(recovered, conf);
     when(source.isPeerEnabled()).thenReturn(true);
     ReplicationSourceWALReader reader =
-      new ReplicationSourceWALReader(fs, conf, walQueue, 0, getDummyFilter(), 
source);
+        new ReplicationSourceWALReader(conf, walQueue, 0, getDummyFilter(), 
source);
     reader.start();
     return reader;
   }
@@ -362,7 +366,7 @@ public class TestWALEntryStream {
     // get ending position
     long position;
     try (WALEntryStream entryStream =
-        new WALEntryStream(walQueue, fs, CONF, 0, log, null, new 
MetricsSource("1"))) {
+        new FSWALEntryStream(fs, walQueue, CONF, 0, null, new 
MetricsSource("1"), walProvider)) {
       entryStream.next();
       entryStream.next();
       entryStream.next();
@@ -476,8 +480,8 @@ public class TestWALEntryStream {
     appendEntriesToLogAndSync(3);
     // get ending position
     long position;
-    try (WALEntryStream entryStream =
-      new WALEntryStream(walQueue, fs, CONF, 0, log, null, new 
MetricsSource("1"))) {
+    try (WALEntryStream entryStream = new FSWALEntryStream(fs, walQueue, CONF, 
0, null,
+        new MetricsSource("1"), this.walProvider)) {
       entryStream.next();
       entryStream.next();
       entryStream.next();
@@ -495,7 +499,7 @@ public class TestWALEntryStream {
     });
 
     ReplicationSourceWALReader reader =
-      new ReplicationSourceWALReader(fs, CONF, walQueue, 0, getDummyFilter(), 
source);
+      new ReplicationSourceWALReader(CONF, walQueue, 0, getDummyFilter(), 
source);
     reader.start();
     Future<WALEntryBatch> future = ForkJoinPool.commonPool().submit(() -> {
       return reader.take();
@@ -591,10 +595,15 @@ public class TestWALEntryStream {
   public void testReadBeyondCommittedLength() throws IOException, 
InterruptedException {
     appendToLog("1");
     appendToLog("2");
-    long size = log.getLogFileSizeIfBeingWritten(walQueue.peek()).getAsLong();
+    long size =
+        log.getLogFileSizeIfBeingWritten(((FSWALIdentity) 
walQueue.peek()).getPath()).getAsLong();
     AtomicLong fileLength = new AtomicLong(size - 1);
-    try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, CONF, 0,
-        p -> OptionalLong.of(fileLength.get()), null, new MetricsSource("1"))) 
{
+    try (WALEntryStream entryStream =
+        new FSWALEntryStream(fs, walQueue, CONF, 0, null, new 
MetricsSource("1"), walProvider) {
+          public WALFileLengthProvider getWALFileLengthProvider() {
+            return p -> OptionalLong.of(fileLength.get());
+          }
+        }) {
       assertTrue(entryStream.hasNext());
       assertNotNull(entryStream.next());
       // can not get log 2
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java
index d062c77..e2cf6f5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java
@@ -27,18 +27,24 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.client.RegionInfo;
 // imports for things that haven't moved from regionserver.wal yet.
 import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
 import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter;
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.replication.regionserver.FSWALEntryStream;
+import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
+import org.apache.hadoop.hbase.replication.regionserver.WALEntryStream;
 import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
@@ -92,6 +98,11 @@ public class IOTestProvider implements WALProvider {
   protected AtomicBoolean initialized = new AtomicBoolean(false);
 
   private List<WALActionsListener> listeners = new ArrayList<>();
+
+  private Path oldLogDir;
+
+  private Path rootDir;
+
   /**
    * @param factory factory that made us, identity used for FS layout. may not 
be null
    * @param conf may not be null
@@ -106,6 +117,8 @@ public class IOTestProvider implements WALProvider {
     this.factory = factory;
     this.conf = conf;
     this.providerId = providerId != null ? providerId : DEFAULT_PROVIDER_ID;
+    rootDir = FSUtils.getRootDir(conf);
+    oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
   }
 
   @Override
@@ -288,4 +301,32 @@ public class IOTestProvider implements WALProvider {
     // TODO Implement WALProvider.addWALActionLister
 
   }
+
+  @Override
+  public WALEntryStream getWalStream(PriorityBlockingQueue<WALIdentity> 
logQueue,
+      Configuration conf, long startPosition, ServerName serverName, 
MetricsSource metrics)
+      throws IOException {
+    return new FSWALEntryStream(CommonFSUtils.getWALFileSystem(conf), 
logQueue, conf, startPosition,
+        serverName, metrics, this);
+  }
+
+  @Override
+  public WALIdentity createWalIdentity(ServerName serverName, String walName, 
boolean isArchive) {
+    Path walPath;
+    if (isArchive) {
+      walPath = new Path(oldLogDir, walName);
+    } else {
+      Path logDir =
+          new Path(rootDir, 
AbstractFSWALProvider.getWALDirectoryName(serverName.toString()));
+      walPath = new Path(logDir, walName);
+    }
+    return new FSWALIdentity(walPath);
+  }
+
+  @Override
+  public WALIdentity locateWalId(WALIdentity wal, Server server, 
List<ServerName> deadRegionServers)
+      throws IOException {
+    return wal;
+  }
+
 }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java
index 8fbe09d..3e73300 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java
@@ -151,10 +151,9 @@ public class TestWALFactory {
     TEST_UTIL.getConfiguration().set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY,
         SampleRegionWALCoprocessor.class.getName());
     TEST_UTIL.startMiniDFSCluster(3);
-
+    hbaseWALDir = TEST_UTIL.createWALRootDir();
     conf = TEST_UTIL.getConfiguration();
     cluster = TEST_UTIL.getDFSCluster();
-
     hbaseDir = TEST_UTIL.createRootDir();
     hbaseWALDir = TEST_UTIL.createWALRootDir();
   }
@@ -679,7 +678,7 @@ public class TestWALFactory {
 
   @Test
   public void testWALProviders() throws IOException {
-    Configuration conf = new Configuration();
+    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
     // if providers are not set but enable SyncReplicationWALProvider by 
default for master node
     // with not only system tables
     WALFactory walFactory = new WALFactory(conf, 
this.currentServername.toString());
@@ -695,7 +694,7 @@ public class TestWALFactory {
 
   @Test
   public void testOnlySetWALProvider() throws IOException {
-    Configuration conf = new Configuration();
+    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
     conf.set(WAL_PROVIDER, WALFactory.Providers.multiwal.name());
     WALFactory walFactory = new WALFactory(conf, 
this.currentServername.toString());
     WALProvider wrappedWALProvider = ((SyncReplicationWALProvider) 
walFactory.getWALProvider())
@@ -709,7 +708,7 @@ public class TestWALFactory {
 
   @Test
   public void testOnlySetMetaWALProvider() throws IOException {
-    Configuration conf = new Configuration();
+    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
     conf.set(META_WAL_PROVIDER, WALFactory.Providers.asyncfs.name());
     WALFactory walFactory = new WALFactory(conf, 
this.currentServername.toString());
     WALProvider wrappedWALProvider = ((SyncReplicationWALProvider) 
walFactory.getWALProvider())
@@ -722,7 +721,7 @@ public class TestWALFactory {
 
   @Test
   public void testDefaultProvider() throws IOException {
-    final Configuration conf = new Configuration();
+    final Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
     // AsyncFSWal is the default, we should be able to request any WAL.
     final WALFactory normalWalFactory = new WALFactory(conf, 
this.currentServername.toString());
     Class<? extends WALProvider> fshLogProvider = 
normalWalFactory.getProviderClass(
@@ -745,7 +744,7 @@ public class TestWALFactory {
 
   @Test
   public void testCustomProvider() throws IOException {
-    final Configuration config = new Configuration();
+    final Configuration config = new 
Configuration(TEST_UTIL.getConfiguration());
     config.set(WALFactory.WAL_PROVIDER, IOTestProvider.class.getName());
     final WALFactory walFactory = new WALFactory(config, 
this.currentServername.toString());
     Class<? extends WALProvider> walProvider = walFactory.getProviderClass(
@@ -757,7 +756,7 @@ public class TestWALFactory {
 
   @Test
   public void testCustomMetaProvider() throws IOException {
-    final Configuration config = new Configuration();
+    final Configuration config = new 
Configuration(TEST_UTIL.getConfiguration());
     config.set(WALFactory.META_WAL_PROVIDER, IOTestProvider.class.getName());
     final WALFactory walFactory = new WALFactory(config, 
this.currentServername.toString());
     Class<? extends WALProvider> walProvider = walFactory.getProviderClass(

Reply via email to