This is an automated email from the ASF dual-hosted git repository.

wchevreuil pushed a commit to branch branch-1
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-1 by this push:
     new e0c419d  HBASE-23205 Correctly update the position of WALs currently 
being replicated (#944)
e0c419d is described below

commit e0c419d76d5a97f34500ec60795583a8a2840119
Author: Jeongdae Kim <[email protected]>
AuthorDate: Thu Jan 2 20:56:38 2020 +0900

    HBASE-23205 Correctly update the position of WALs currently being 
replicated (#944)
    
    Signed-off-by: Wellington Chevreuil <[email protected]>
---
 .../regionserver/ReplicationSource.java            |  64 ++++--
 .../regionserver/ReplicationSourceManager.java     |  20 +-
 .../ReplicationSourceWALReaderThread.java          | 138 +++++++-----
 .../hbase/replication/TestReplicationSource.java   | 248 +++++++++++++++++++--
 .../regionserver/TestWALEntryStream.java           | 181 ++++++++++++---
 5 files changed, 500 insertions(+), 151 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 124da63..5acb709 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -18,6 +18,7 @@
  */
 package org.apache.hadoop.hbase.replication.regionserver;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.Service;
@@ -439,14 +440,30 @@ public class ReplicationSource extends Thread implements 
ReplicationSourceInterf
   }
 
   @Override
+  @VisibleForTesting
   public Path getCurrentPath() {
-    // only for testing
     for (ReplicationSourceShipperThread worker : workerThreads.values()) {
       if (worker.getCurrentPath() != null) return worker.getCurrentPath();
     }
     return null;
   }
 
+  @VisibleForTesting
+  public Path getLastLoggedPath() {
+    for (ReplicationSourceShipperThread worker : workerThreads.values()) {
+      return worker.getLastLoggedPath();
+    }
+    return null;
+  }
+
+  @VisibleForTesting
+  public long getLastLoggedPosition() {
+    for (ReplicationSourceShipperThread worker : workerThreads.values()) {
+      return worker.getLastLoggedPosition();
+    }
+    return 0;
+  }
+
   private boolean isSourceActive() {
     return !this.stopper.isStopped() && this.sourceRunning;
   }
@@ -481,8 +498,8 @@ public class ReplicationSource extends Thread implements 
ReplicationSourceInterf
     for (Map.Entry<String, ReplicationSourceShipperThread> entry : 
workerThreads.entrySet()) {
       String walGroupId = entry.getKey();
       ReplicationSourceShipperThread worker = entry.getValue();
-      long position = worker.getCurrentPosition();
-      Path currentPath = worker.getCurrentPath();
+      long position = worker.getLastLoggedPosition();
+      Path currentPath = worker.getLastLoggedPath();
       sb.append("walGroup [").append(walGroupId).append("]: ");
       if (currentPath != null) {
         sb.append("currently replicating from: ").append(currentPath).append(" 
at position: ")
@@ -517,7 +534,7 @@ public class ReplicationSource extends Thread implements 
ReplicationSourceInterf
       int queueSize = queues.get(walGroupId).size();
       replicationDelay =
           ReplicationLoad.calculateReplicationDelay(ageOfLastShippedOp, 
lastTimeStamp, queueSize);
-      Path currentPath = worker.getCurrentPath();
+      Path currentPath = worker.getLastLoggedPath();
       fileSize = -1;
       if (currentPath != null) {
         try {
@@ -535,7 +552,7 @@ public class ReplicationSource extends Thread implements 
ReplicationSourceInterf
           .withQueueSize(queueSize)
           .withWalGroup(walGroupId)
           .withCurrentPath(currentPath)
-          .withCurrentPosition(worker.getCurrentPosition())
+          .withCurrentPosition(worker.getLastLoggedPosition())
           .withFileSize(fileSize)
           .withAgeOfLastShippedOp(ageOfLastShippedOp)
           .withReplicationDelay(replicationDelay);
@@ -555,7 +572,7 @@ public class ReplicationSource extends Thread implements 
ReplicationSourceInterf
     // Last position in the log that we sent to ZooKeeper
     private long lastLoggedPosition = -1;
     // Path of the current log
-    private volatile Path currentPath;
+    private volatile Path lastLoggedPath;
     // Current state of the worker thread
     private WorkerState state;
     ReplicationSourceWALReaderThread entryReader;
@@ -600,13 +617,11 @@ public class ReplicationSource extends Thread implements 
ReplicationSourceInterf
           WALEntryBatch entryBatch = entryReader.take();
           shipEdits(entryBatch);
           releaseBufferQuota((int) entryBatch.getHeapSize());
-          if (replicationQueueInfo.isQueueRecovered() && 
entryBatch.getWalEntries().isEmpty()
-              && entryBatch.getLastSeqIds().isEmpty()) {
-            LOG.debug("Finished recovering queue for group " + walGroupId + " 
of peer "
-                + peerClusterZnode);
+          if (!entryBatch.hasMoreEntries()) {
+            LOG.debug("Finished recovering queue for group "
+                    + walGroupId + " of peer " + peerClusterZnode);
             metrics.incrCompletedRecoveryQueue();
             setWorkerState(WorkerState.FINISHED);
-            continue;
           }
         } catch (InterruptedException e) {
           LOG.trace("Interrupted while waiting for next replication entry 
batch", e);
@@ -614,7 +629,7 @@ public class ReplicationSource extends Thread implements 
ReplicationSourceInterf
         }
       }
 
-      if (replicationQueueInfo.isQueueRecovered() && getWorkerState() == 
WorkerState.FINISHED) {
+      if (getWorkerState() == WorkerState.FINISHED) {
         // use synchronize to make sure one last thread will clean the queue
         synchronized (this) {
           Threads.sleep(100);// wait a short while for other worker thread to 
fully exit
@@ -694,15 +709,13 @@ public class ReplicationSource extends Thread implements 
ReplicationSourceInterf
     protected void shipEdits(WALEntryBatch entryBatch) {
       List<Entry> entries = entryBatch.getWalEntries();
       long lastReadPosition = entryBatch.getLastWalPosition();
-      currentPath = entryBatch.getLastWalPath();
+      lastLoggedPath = entryBatch.getLastWalPath();
       int sleepMultiplier = 0;
       if (entries.isEmpty()) {
-        if (lastLoggedPosition != lastReadPosition) {
-          updateLogPosition(lastReadPosition);
-          // if there was nothing to ship and it's not an error
-          // set "ageOfLastShippedOp" to <now> to indicate that we're current
-          metrics.setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(), 
walGroupId);
-        }
+        updateLogPosition(lastReadPosition);
+        // if there was nothing to ship and it's not an error
+        // set "ageOfLastShippedOp" to <now> to indicate that we're current
+        metrics.setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(), 
walGroupId);
         return;
       }
       int currentSize = (int) entryBatch.getHeapSize();
@@ -787,8 +800,7 @@ public class ReplicationSource extends Thread implements 
ReplicationSourceInterf
     }
 
     private void updateLogPosition(long lastReadPosition) {
-      manager.setPendingShipment(false);
-      manager.logPositionAndCleanOldLogs(currentPath, peerClusterZnode, 
lastReadPosition,
+      manager.logPositionAndCleanOldLogs(lastLoggedPath, peerClusterZnode, 
lastReadPosition,
         this.replicationQueueInfo.isQueueRecovered(), false);
       lastLoggedPosition = lastReadPosition;
     }
@@ -800,7 +812,7 @@ public class ReplicationSource extends Thread implements 
ReplicationSourceInterf
         public void uncaughtException(final Thread t, final Throwable e) {
           RSRpcServices.exitIfOOME(e);
           LOG.error("Unexpected exception in ReplicationSourceWorkerThread," + 
" currentPath="
-              + getCurrentPath(), e);
+              + getLastLoggedPath(), e);
           stopper.stop("Unexpected exception in 
ReplicationSourceWorkerThread");
         }
       };
@@ -941,8 +953,12 @@ public class ReplicationSource extends Thread implements 
ReplicationSourceInterf
       return this.entryReader.getCurrentPath();
     }
 
-    public long getCurrentPosition() {
-      return this.lastLoggedPosition;
+    public Path getLastLoggedPath() {
+      return lastLoggedPath;
+    }
+
+    public long getLastLoggedPosition() {
+      return lastLoggedPosition;
     }
 
     private boolean isWorkerActive() {
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 6b8b6e2..0e3724a 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -123,8 +123,6 @@ public class ReplicationSourceManager implements 
ReplicationListener {
 
   private AtomicLong totalBufferUsed = new AtomicLong();
 
-  private boolean pendingShipment;
-
   /**
    * Creates a replication manager and sets the watch on all the other 
registered region servers
    * @param replicationQueues the interface for manipulating replication queues
@@ -191,19 +189,13 @@ public class ReplicationSourceManager implements 
ReplicationListener {
    * @param holdLogInZK if true then the log is retained in ZK
    */
   public synchronized void logPositionAndCleanOldLogs(Path log, String id, 
long position,
-      boolean queueRecovered, boolean holdLogInZK) {
-    if (!this.pendingShipment) {
-      String fileName = log.getName();
-      this.replicationQueues.setLogPosition(id, fileName, position);
-      if (holdLogInZK) {
-        return;
-      }
-      cleanOldLogs(fileName, id, queueRecovered);
+    boolean queueRecovered, boolean holdLogInZK) {
+    String fileName = log.getName();
+    this.replicationQueues.setLogPosition(id, fileName, position);
+    if (holdLogInZK) {
+      return;
     }
-  }
-
-  public synchronized void setPendingShipment(boolean pendingShipment) {
-    this.pendingShipment = pendingShipment;
+    cleanOldLogs(fileName, id, queueRecovered);
   }
 
   /**
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
index 1d94a7a..5855574 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
@@ -21,9 +21,7 @@ package org.apache.hadoop.hbase.replication.regionserver;
 import java.io.EOFException;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.PriorityBlockingQueue;
@@ -69,7 +67,8 @@ public class ReplicationSourceWALReaderThread extends Thread {
   // max count of each batch - multiply by number of batches in queue to get 
total
   private int replicationBatchCountCapacity;
   // position in the WAL to start reading at
-  private long currentPosition;
+  private long lastReadPosition;
+  private Path lastReadPath;
   private WALEntryFilter filter;
   private long sleepForRetries;
   //Indicates whether this particular worker is running
@@ -81,8 +80,6 @@ public class ReplicationSourceWALReaderThread extends Thread {
   private AtomicLong totalBufferUsed;
   private long totalBufferQuota;
 
-  private ReplicationSourceManager replicationSourceManager;
-
   /**
    * Creates a reader worker for a given WAL queue. Reads WAL entries off a 
given queue, batches the
    * entries, and puts them on a batch queue.
@@ -101,7 +98,8 @@ public class ReplicationSourceWALReaderThread extends Thread 
{
       FileSystem fs, Configuration conf, WALEntryFilter filter, MetricsSource 
metrics) {
     this.replicationQueueInfo = replicationQueueInfo;
     this.logQueue = logQueue;
-    this.currentPosition = startPosition;
+    this.lastReadPath = logQueue.peek();
+    this.lastReadPosition = startPosition;
     this.fs = fs;
     this.conf = conf;
     this.filter = filter;
@@ -111,7 +109,6 @@ public class ReplicationSourceWALReaderThread extends 
Thread {
     // memory used will be batchSizeCapacity * (nb.batches + 1)
     // the +1 is for the current thread reading before placing onto the queue
     int batchCount = conf.getInt("replication.source.nb.batches", 1);
-    this.replicationSourceManager = manager;
     this.totalBufferUsed = manager.getTotalBufferUsed();
     this.totalBufferQuota = 
conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY,
       HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
@@ -133,61 +130,45 @@ public class ReplicationSourceWALReaderThread extends 
Thread {
     int sleepMultiplier = 1;
     while (isReaderRunning()) { // we only loop back here if something fatal 
happened to our stream
       try (WALEntryStream entryStream =
-          new WALEntryStream(logQueue, fs, conf, currentPosition, metrics)) {
+          new WALEntryStream(logQueue, fs, conf, lastReadPosition, metrics)) {
         while (isReaderRunning()) { // loop here to keep reusing stream while 
we can
           if (!checkQuota()) {
             continue;
           }
-          WALEntryBatch batch = null;
-          while (entryStream.hasNext()) {
-            if (batch == null) {
-              batch = new WALEntryBatch(replicationBatchCountCapacity, 
entryStream.getCurrentPath());
-            }
+          WALEntryBatch batch = new 
WALEntryBatch(replicationBatchCountCapacity);
+          boolean hasNext;
+          while ((hasNext = entryStream.hasNext()) == true) {
             Entry entry = entryStream.next();
             entry = filterEntry(entry);
             if (entry != null) {
               WALEdit edit = entry.getEdit();
               if (edit != null && !edit.isEmpty()) {
                 long entrySize = getEntrySizeIncludeBulkLoad(entry);
-                long entrySizeExlucdeBulkLoad = 
getEntrySizeExcludeBulkLoad(entry);
+                long entrySizeExcludeBulkLoad = 
getEntrySizeExcludeBulkLoad(entry);
                 batch.addEntry(entry);
-                replicationSourceManager.setPendingShipment(true);
                 updateBatchStats(batch, entry, entryStream.getPosition(), 
entrySize);
-                boolean totalBufferTooLarge = 
acquireBufferQuota(entrySizeExlucdeBulkLoad);
+                boolean totalBufferTooLarge = 
acquireBufferQuota(entrySizeExcludeBulkLoad);
                 // Stop if too many entries or too big
                 if (totalBufferTooLarge || batch.getHeapSize() >= 
replicationBatchSizeCapacity
                     || batch.getNbEntries() >= replicationBatchCountCapacity) {
                   break;
                 }
               }
-            } else {
-              
replicationSourceManager.logPositionAndCleanOldLogs(entryStream.getCurrentPath(),
-                this.replicationQueueInfo.getPeerClusterZnode(),
-                entryStream.getPosition(),
-                this.replicationQueueInfo.isQueueRecovered(), false);
             }
           }
-          if (batch != null && (!batch.getLastSeqIds().isEmpty() || 
batch.getNbEntries() > 0)) {
-            if (LOG.isTraceEnabled()) {
-              LOG.trace(String.format("Read %s WAL entries eligible for 
replication",
-                batch.getNbEntries()));
-            }
-            entryBatchQueue.put(batch);
+
+          updateBatch(entryStream, batch, hasNext);
+          if (isShippable(batch)) {
             sleepMultiplier = 1;
-          } else { // got no entries and didn't advance position in WAL
-            LOG.trace("Didn't read any new entries from WAL");
-            if (replicationQueueInfo.isQueueRecovered()) {
-              // we're done with queue recovery, shut ourself down
+            entryBatchQueue.put(batch);
+            if (!batch.hasMoreEntries()) {
+              // we're done with queue recovery, shut ourselves down
               setReaderRunning(false);
-              // shuts down shipper thread immediately
-              entryBatchQueue.put(batch != null ? batch
-                  : new WALEntryBatch(replicationBatchCountCapacity, 
entryStream.getCurrentPath()));
-            } else {
-              Thread.sleep(sleepForRetries);
             }
+          } else {
+            Thread.sleep(sleepForRetries);
           }
-          currentPosition = entryStream.getPosition();
-          entryStream.reset(); // reuse stream
+          resetStream(entryStream);
         }
       } catch (IOException | WALEntryStreamRuntimeException e) { // stream 
related
         if (sleepMultiplier < maxRetriesMultiplier) {
@@ -205,6 +186,38 @@ public class ReplicationSourceWALReaderThread extends 
Thread {
     }
   }
 
+  private void updateBatch(WALEntryStream entryStream, WALEntryBatch batch, 
boolean moreData) {
+    logMessage(batch);
+    batch.updatePosition(entryStream);
+    batch.setMoreEntries(!replicationQueueInfo.isQueueRecovered() || moreData);
+  }
+
+  private void logMessage(WALEntryBatch batch) {
+    if (LOG.isTraceEnabled()) {
+      if (batch.isEmpty()) {
+        LOG.trace("Didn't read any new entries from WAL");
+      } else {
+        LOG.trace(String.format("Read %s WAL entries eligible for replication",
+                batch.getNbEntries()));
+      }
+    }
+  }
+
+  private boolean isShippable(WALEntryBatch batch) {
+    return !batch.isEmpty() || checkIfWALRolled(batch) || 
!batch.hasMoreEntries();
+  }
+
+  private boolean checkIfWALRolled(WALEntryBatch batch) {
+    return lastReadPath == null && batch.lastWalPath != null
+      || lastReadPath != null && !lastReadPath.equals(batch.lastWalPath);
+  }
+
+  private void resetStream(WALEntryStream stream) throws IOException {
+    lastReadPosition = stream.getPosition();
+    lastReadPath = stream.getCurrentPath();
+    stream.reset(); // reuse stream
+  }
+
   // if we get an EOF due to a zero-length log, and there are other logs in 
queue
   // (highly likely we've closed the current log), we've hit the max retries, 
and autorecovery is
   // enabled, then dump the log
@@ -214,8 +227,8 @@ public class ReplicationSourceWALReaderThread extends 
Thread {
       try {
         if (fs.getFileStatus(logQueue.peek()).getLen() == 0) {
           LOG.warn("Forcing removal of 0 length log in queue: " + 
logQueue.peek());
-          logQueue.remove();
-          currentPosition = 0;
+          lastReadPath = logQueue.remove();
+          lastReadPosition = 0;
         }
       } catch (IOException ioe) {
         LOG.warn("Couldn't get file length information about log " + 
logQueue.peek());
@@ -224,12 +237,6 @@ public class ReplicationSourceWALReaderThread extends 
Thread {
   }
 
   public Path getCurrentPath() {
-    // if we've read some WAL entries, get the Path we read from
-    WALEntryBatch batchQueueHead = entryBatchQueue.peek();
-    if (batchQueueHead != null) {
-      return batchQueueHead.lastWalPath;
-    }
-    // otherwise, we must be currently reading from the head of the log queue
     return logQueue.peek();
   }
 
@@ -380,6 +387,10 @@ public class ReplicationSourceWALReaderThread extends 
Thread {
     this.isReaderRunning = readerRunning;
   }
 
+  public long getLastReadPosition() {
+    return this.lastReadPosition;
+  }
+
   /**
    * Holds a batch of WAL entries to replicate, along with some statistics
    *
@@ -396,17 +407,14 @@ public class ReplicationSourceWALReaderThread extends 
Thread {
     private int nbHFiles = 0;
     // heap size of data we need to replicate
     private long heapSize = 0;
-    // save the last sequenceid for each region if the table has 
serial-replication scope
-    private Map<String, Long> lastSeqIds = new HashMap<>();
+    // whether more entries to read exist in WALs or not
+    private boolean moreEntries = true;
 
     /**
-     * @param walEntries
-     * @param lastWalPath Path of the WAL the last entry in this batch was 
read from
-     * @param lastWalPosition Position in the WAL the last entry in this batch 
was read from
+     * @param maxNbEntries the number of entries a batch can have
      */
-    private WALEntryBatch(int maxNbEntries, Path lastWalPath) {
+    private WALEntryBatch(int maxNbEntries) {
       this.walEntries = new ArrayList<>(maxNbEntries);
-      this.lastWalPath = lastWalPath;
     }
 
     public void addEntry(Entry entry) {
@@ -466,13 +474,6 @@ public class ReplicationSourceWALReaderThread extends 
Thread {
       return heapSize;
     }
 
-    /**
-     * @return the last sequenceid for each region if the table has 
serial-replication scope
-     */
-    public Map<String, Long> getLastSeqIds() {
-      return lastSeqIds;
-    }
-
     private void incrementNbRowKeys(int increment) {
       nbRowKeys += increment;
     }
@@ -484,5 +485,22 @@ public class ReplicationSourceWALReaderThread extends 
Thread {
     private void incrementHeapSize(long increment) {
       heapSize += increment;
     }
+
+    public boolean isEmpty() {
+      return walEntries.isEmpty();
+    }
+
+    public void updatePosition(WALEntryStream entryStream) {
+      lastWalPath = entryStream.getCurrentPath();
+      lastWalPosition = entryStream.getPosition();
+    }
+
+    public boolean hasMoreEntries() {
+      return moreEntries;
+    }
+
+    public void setMoreEntries(boolean moreEntries) {
+      this.moreEntries = moreEntries;
+    }
   }
 }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
index 5d9059a..b4ac71b 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
@@ -16,13 +16,30 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.hadoop.hbase.replication;
 
+import static 
org.apache.hadoop.hbase.replication.TestReplicationEndpoint.ReplicationEndpointForTest;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.mockito.internal.verification.VerificationModeFactory.times;
 
 import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -34,36 +51,38 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.CoordinatedStateManager;
-import org.apache.hadoop.hbase.MiniHBaseCluster;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.Waiter;
-import org.apache.hadoop.hbase.Waiter.Predicate;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.Waiter.Predicate;
 import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import 
org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
+import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
 import org.apache.hadoop.hbase.replication.regionserver.Replication;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource;
 import 
org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.wal.WAL;
-import org.apache.hadoop.hbase.wal.WALProvider;
 import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.hadoop.hbase.wal.WALKey;
-import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
-import 
org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
-import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.wal.WALProvider;
+import org.junit.After;
 import org.junit.AfterClass;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
 
-import static org.mockito.Mockito.mock;
-
 @Category(MediumTests.class)
 public class TestReplicationSource {
 
@@ -92,6 +111,32 @@ public class TestReplicationSource {
     if (FS.exists(logDir)) FS.delete(logDir, true);
   }
 
+  @Before
+  public void setup() throws IOException {
+    if (!FS.exists(logDir)) {
+      FS.mkdirs(logDir);
+    }
+    if (!FS.exists(oldLogDir)) {
+      FS.mkdirs(oldLogDir);
+    }
+
+    ReplicationEndpointForTest.contructedCount.set(0);
+    ReplicationEndpointForTest.startedCount.set(0);
+    ReplicationEndpointForTest.replicateCount.set(0);
+    ReplicationEndpointForTest.stoppedCount.set(0);
+    ReplicationEndpointForTest.lastEntries = null;
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    if (FS.exists(oldLogDir)) {
+      FS.delete(oldLogDir, true);
+    }
+    if (FS.exists(logDir)) {
+      FS.delete(logDir, true);
+    }
+  }
+
   @AfterClass
   public static void tearDownAfterClass() throws Exception {
     TEST_UTIL_PEER.shutdownMiniHBaseCluster();
@@ -108,8 +153,6 @@ public class TestReplicationSource {
   @Test
   public void testLogMoving() throws Exception{
     Path logPath = new Path(logDir, "log");
-    if (!FS.exists(logDir)) FS.mkdirs(logDir);
-    if (!FS.exists(oldLogDir)) FS.mkdirs(oldLogDir);
     WALProvider.Writer writer = WALFactory.createWALWriter(FS, logPath,
         TEST_UTIL.getConfiguration());
     for(int i = 0; i < 3; i++) {
@@ -166,7 +209,6 @@ public class TestReplicationSource {
     Configuration testConf = HBaseConfiguration.create();
     testConf.setInt("replication.source.maxretriesmultiplier", 1);
     ReplicationSourceManager manager = 
Mockito.mock(ReplicationSourceManager.class);
-    Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
     source.init(testConf, null, manager, null, mockPeers, null, "testPeer",
         null, replicationEndpoint, null);
     ExecutorService executor = Executors.newSingleThreadExecutor();
@@ -189,10 +231,184 @@ public class TestReplicationSource {
 
   }
 
+  private void appendEntries(WALProvider.Writer writer, int numEntries) throws 
IOException {
+    for (int i = 0; i < numEntries; i++) {
+      byte[] b = Bytes.toBytes(Integer.toString(i));
+      KeyValue kv = new KeyValue(b,b,b);
+      WALEdit edit = new WALEdit();
+      edit.add(kv);
+      WALKey key = new WALKey(b, TableName.valueOf(b), 0, 0,
+              HConstants.DEFAULT_CLUSTER_ID);
+      NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], 
Integer>(Bytes.BYTES_COMPARATOR);
+      scopes.put(b, HConstants.REPLICATION_SCOPE_GLOBAL);
+      key.setScopes(scopes);
+      writer.append(new WAL.Entry(key, edit));
+      writer.sync(false);
+    }
+    writer.close();
+  }
+
+  private long getPosition(WALFactory wals, Path log2, int numEntries) throws 
IOException {
+    WAL.Reader reader = wals.createReader(FS, log2);
+    for (int i = 0; i < numEntries; i++) {
+      reader.next();
+    }
+    return reader.getPosition();
+  }
+
+  private static final class Mocks {
+    private final ReplicationSourceManager manager = 
mock(ReplicationSourceManager.class);
+    private final ReplicationQueues queues = mock(ReplicationQueues.class);
+    private final ReplicationPeers peers = mock(ReplicationPeers.class);
+    private final MetricsSource metrics = mock(MetricsSource.class);
+    private final ReplicationPeer peer = mock(ReplicationPeer.class);
+    private final ReplicationEndpoint.Context context = 
mock(ReplicationEndpoint.Context.class);
+
+    private Mocks() {
+      when(peers.getStatusOfPeer(anyString())).thenReturn(true);
+      when(context.getReplicationPeer()).thenReturn(peer);
+      when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
+    }
+
+    ReplicationSource createReplicationSourceWithMocks(ReplicationEndpoint 
endpoint)
+            throws IOException {
+      final ReplicationSource source = new ReplicationSource();
+      endpoint.init(context);
+      source.init(conf, FS, manager, queues, peers, mock(Stoppable.class),
+              "testPeerClusterZnode", UUID.randomUUID(), endpoint, metrics);
+      return source;
+    }
+  }
+
+  @Test
+  public void testSetLogPositionForWALCurrentlyReadingWhenLogsRolled() throws 
Exception {
+    final int numWALEntries = 5;
+    conf.setInt("replication.source.nb.capacity", numWALEntries);
+
+    Mocks mocks = new Mocks();
+    final ReplicationEndpointForTest endpoint = new 
ReplicationEndpointForTest() {
+      @Override
+      public WALEntryFilter getWALEntryfilter() {
+        return null;
+      }
+    };
+    WALFactory wals = new WALFactory(TEST_UTIL.getConfiguration(), null, 
"test");
+    final Path log1 = new Path(logDir, "log.1");
+    final Path log2 = new Path(logDir, "log.2");
+
+    WALProvider.Writer writer1 = WALFactory.createWALWriter(FS, log1, 
TEST_UTIL.getConfiguration());
+    WALProvider.Writer writer2 = WALFactory.createWALWriter(FS, log2, 
TEST_UTIL.getConfiguration());
+
+    appendEntries(writer1, 3);
+    appendEntries(writer2, 2);
+
+    long pos = getPosition(wals, log2, 2);
+
+    final ReplicationSource source = 
mocks.createReplicationSourceWithMocks(endpoint);
+    source.run();
+
+    source.enqueueLog(log1);
+    // log rolled
+    source.enqueueLog(log2);
+
+    Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
+      @Override public boolean evaluate() throws Exception {
+        return endpoint.replicateCount.get() > 0;
+      }
+    });
+
+    ArgumentCaptor<Path> pathCaptor = ArgumentCaptor.forClass(Path.class);
+    ArgumentCaptor<Long> positionCaptor = ArgumentCaptor.forClass(Long.class);
+    verify(mocks.manager, times(1))
+        .logPositionAndCleanOldLogs(pathCaptor.capture(), anyString(), 
positionCaptor.capture(),
+              anyBoolean(), anyBoolean());
+    assertTrue(endpoint.lastEntries.size() == 5);
+    assertThat(pathCaptor.getValue(), is(log2));
+    assertThat(positionCaptor.getValue(), is(pos));
+  }
+
+  @Test
+  public void testSetLogPositionAndRemoveOldWALsEvenIfEmptyWALsRolled() throws 
Exception {
+    Mocks mocks = new Mocks();
+
+    final ReplicationEndpointForTest endpoint = new 
ReplicationEndpointForTest();
+    final ReplicationSource source = 
mocks.createReplicationSourceWithMocks(endpoint);
+    WALFactory wals = new WALFactory(TEST_UTIL.getConfiguration(), null, 
"test");
+
+    final Path log1 = new Path(logDir, "log.1");
+    final Path log2 = new Path(logDir, "log.2");
+
+    WALFactory.createWALWriter(FS, log1, TEST_UTIL.getConfiguration()).close();
+    WALFactory.createWALWriter(FS, log2, TEST_UTIL.getConfiguration()).close();
+    final long startPos = getPosition(wals, log2, 0);
+
+    source.run();
+    source.enqueueLog(log1);
+    source.enqueueLog(log2);
+
+    Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
+      @Override public boolean evaluate() throws Exception {
+        return log2.equals(source.getLastLoggedPath())
+                && source.getLastLoggedPosition() >= startPos;
+      }
+    });
+
+    ArgumentCaptor<Path> pathCaptor = ArgumentCaptor.forClass(Path.class);
+    ArgumentCaptor<Long> positionCaptor = ArgumentCaptor.forClass(Long.class);
+
+    verify(mocks.manager, times(1))
+            .logPositionAndCleanOldLogs(pathCaptor.capture(), anyString(), 
positionCaptor.capture(),
+                    anyBoolean(), anyBoolean());
+    assertThat(pathCaptor.getValue(), is(log2));
+    assertThat(positionCaptor.getValue(), is(startPos));
+  }
+
+  @Test
+  public void testSetLogPositionAndRemoveOldWALsEvenIfNoCfsReplicated() throws 
Exception {
+    Mocks mocks = new Mocks();
+    // set table cfs to filter all cells out
+    final TableName replicatedTable = TableName.valueOf("replicated_table");
+    final Map<TableName, List<String>> cfs =
+            Collections.singletonMap(replicatedTable, 
Collections.<String>emptyList());
+    when(mocks.peer.getTableCFs()).thenReturn(cfs);
+
+    WALFactory wals = new WALFactory(TEST_UTIL.getConfiguration(), null, 
"test");
+    final Path log1 = new Path(logDir, "log.1");
+    final Path log2 = new Path(logDir, "log.2");
+
+    WALProvider.Writer writer1 = WALFactory.createWALWriter(FS, log1, 
TEST_UTIL.getConfiguration());
+    WALProvider.Writer writer2 = WALFactory.createWALWriter(FS, log2, 
TEST_UTIL.getConfiguration());
+
+    appendEntries(writer1, 3);
+    appendEntries(writer2, 2);
+    final long pos = getPosition(wals, log2, 2);
+
+    final ReplicationEndpointForTest endpoint = new 
ReplicationEndpointForTest();
+    final ReplicationSource source = 
mocks.createReplicationSourceWithMocks(endpoint);
+    source.enqueueLog(log1);
+    source.enqueueLog(log2);
+    source.run();
+    Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
+      @Override public boolean evaluate() throws Exception {
+        // wait until reader read all cells
+        return log2.equals(source.getLastLoggedPath()) && 
source.getLastLoggedPosition() >= pos;
+      }
+    });
+
+    ArgumentCaptor<Path> pathCaptor = ArgumentCaptor.forClass(Path.class);
+    ArgumentCaptor<Long> positionCaptor = ArgumentCaptor.forClass(Long.class);
+
+    // all old wals should be removed by updating wal position, even if all 
cells are filtered out.
+    verify(mocks.manager, times(1))
+        .logPositionAndCleanOldLogs(pathCaptor.capture(), anyString(), 
positionCaptor.capture(),
+              anyBoolean(), anyBoolean());
+    assertThat(pathCaptor.getValue(), is(log2));
+    assertThat(positionCaptor.getValue(), is(pos));
+  }
+
   /**
    * Tests that recovered queues are preserved on a regionserver shutdown.
    * See HBASE-18192
-   * @throws Exception
    */
   @Test
   public void testServerShutdownRecoveredQueue() throws Exception {
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
index 9f077da..7ad7260 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
@@ -23,17 +23,15 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyBoolean;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -47,17 +45,22 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
 import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
+import org.apache.hadoop.hbase.replication.ReplicationPeer;
 import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
+import org.apache.hadoop.hbase.replication.TableCfWALEntryFilter;
 import org.apache.hadoop.hbase.replication.WALEntryFilter;
 import 
org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReaderThread.WALEntryBatch;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
@@ -77,7 +80,6 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
 import org.junit.runner.RunWith;
-import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
 import org.mockito.runners.MockitoJUnitRunner;
 
@@ -358,8 +360,9 @@ public class TestWALEntryStream {
     // start up a batcher
     ReplicationSourceManager mockSourceManager = 
Mockito.mock(ReplicationSourceManager.class);
     when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
-    ReplicationSourceWALReaderThread batcher = new 
ReplicationSourceWALReaderThread(mockSourceManager, getQueueInfo(),walQueue, 0,
-        fs, conf, getDummyFilter(), new MetricsSource("1"));
+    ReplicationSourceWALReaderThread batcher =
+            new ReplicationSourceWALReaderThread(mockSourceManager, 
getQueueInfo(),walQueue, 0,
+                    fs, conf, getDummyFilter(), new MetricsSource("1"));
     Path walPath = walQueue.peek();
     batcher.start();
     WALEntryBatch entryBatch = batcher.take();
@@ -378,37 +381,36 @@ public class TestWALEntryStream {
   }
 
   @Test
-  public void testReplicationSourceUpdatesLogPositionOnFilteredEntries() 
throws Exception {
+  public void testReplicationSourceWALReaderThreadRecoveredQueue() throws 
Exception {
     appendEntriesToLog(3);
-    // get ending position
+    log.rollWriter();
+    appendEntriesToLog(2);
+
     long position;
-    try (WALEntryStream entryStream =
-      new WALEntryStream(walQueue, fs, conf, new MetricsSource("1"))) {
+    try (WALEntryStream entryStream = new WALEntryStream(new 
PriorityBlockingQueue<>(walQueue),
+            fs, conf, new MetricsSource("1"))) {
+      entryStream.next();
+      entryStream.next();
       entryStream.next();
       entryStream.next();
       entryStream.next();
       position = entryStream.getPosition();
     }
-    // start up a readerThread with a WALEntryFilter that always filter the 
entries
-    ReplicationSourceManager mockSourceManager = 
Mockito.mock(ReplicationSourceManager.class);
+
+    ReplicationSourceManager mockSourceManager = 
mock(ReplicationSourceManager.class);
     when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
-    ReplicationSourceWALReaderThread readerThread = new 
ReplicationSourceWALReaderThread(
-      mockSourceManager, getQueueInfo(), walQueue, 0, fs, conf, new 
WALEntryFilter() {
-        @Override
-        public Entry filter(Entry entry) {
-          return null;
-        }
-      }, new MetricsSource("1"));
-    readerThread.start();
-    Thread.sleep(100);
-    ArgumentCaptor<Long> positionCaptor = ArgumentCaptor.forClass(Long.class);
-    verify(mockSourceManager, times(3))
-      .logPositionAndCleanOldLogs(any(Path.class),
-        anyString(),
-        positionCaptor.capture(),
-        anyBoolean(),
-        anyBoolean());
-    assertEquals(position, positionCaptor.getValue().longValue());
+    ReplicationSourceWALReaderThread reader =
+            new ReplicationSourceWALReaderThread(mockSourceManager, 
getRecoveredQueueInfo(),
+                    walQueue, 0, fs, conf, getDummyFilter(), new 
MetricsSource("1"));
+    Path walPath = walQueue.toArray(new Path[2])[1];
+    reader.start();
+    WALEntryBatch entryBatch = reader.take();
+
+    assertNotNull(entryBatch);
+    assertEquals(5, entryBatch.getWalEntries().size());
+    assertEquals(position, entryBatch.getLastWalPosition());
+    assertEquals(walPath, entryBatch.getLastWalPath());
+    assertFalse(entryBatch.hasMoreEntries());
   }
 
   @Test
@@ -436,6 +438,96 @@ public class TestWALEntryStream {
     }
   }
 
+  @Test
+  public void testReplicationSourceWALReaderThreadWithFilter() throws 
Exception {
+    final byte[] notReplicatedCf = Bytes.toBytes("notReplicated");
+    final Map<TableName, List<String>> tableCfs = new HashMap<>();
+    tableCfs.put(tableName, Collections.singletonList(Bytes.toString(family)));
+    ReplicationPeer peer = mock(ReplicationPeer.class);
+    when(peer.getTableCFs()).thenReturn(tableCfs);
+    WALEntryFilter filter = new ChainWALEntryFilter(new 
TableCfWALEntryFilter(peer));
+
+    // add filterable entries
+    appendToLogPlus(3, notReplicatedCf);
+    appendToLogPlus(3, notReplicatedCf);
+    appendToLogPlus(3, notReplicatedCf);
+
+    // add non filterable entries
+    appendEntriesToLog(2);
+
+    ReplicationSourceManager mockSourceManager = 
mock(ReplicationSourceManager.class);
+    when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
+    final ReplicationSourceWALReaderThread reader =
+            new ReplicationSourceWALReaderThread(mockSourceManager, 
getQueueInfo(), walQueue,
+                    0, fs, conf, filter, new MetricsSource("1"));
+    reader.start();
+
+    WALEntryBatch entryBatch = reader.take();
+
+    assertNotNull(entryBatch);
+    assertFalse(entryBatch.isEmpty());
+    List<Entry> walEntries = entryBatch.getWalEntries();
+    assertEquals(2, walEntries.size());
+    for (Entry entry : walEntries) {
+      ArrayList<Cell> cells = entry.getEdit().getCells();
+      assertTrue(cells.size() == 1);
+      assertTrue(CellUtil.matchingFamily(cells.get(0), family));
+    }
+  }
+
+  @Test
+  public void testReplicationSourceWALReaderThreadWithFilterWhenLogRolled() 
throws Exception {
+    final byte[] notReplicatedCf = Bytes.toBytes("notReplicated");
+    final Map<TableName, List<String>> tableCfs = new HashMap<>();
+    tableCfs.put(tableName, Collections.singletonList(Bytes.toString(family)));
+    ReplicationPeer peer = mock(ReplicationPeer.class);
+    when(peer.getTableCFs()).thenReturn(tableCfs);
+    WALEntryFilter filter = new ChainWALEntryFilter(new 
TableCfWALEntryFilter(peer));
+
+    appendToLogPlus(3, notReplicatedCf);
+
+    Path firstWAL = walQueue.peek();
+    final long eof = getPosition(firstWAL);
+
+    ReplicationSourceManager mockSourceManager = 
mock(ReplicationSourceManager.class);
+    when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
+    final ReplicationSourceWALReaderThread reader =
+            new ReplicationSourceWALReaderThread(mockSourceManager, 
getQueueInfo(), walQueue,
+                    0, fs, conf, filter, new MetricsSource("1"));
+    reader.start();
+
+    // reader won't put any batch, even if EOF reached.
+    Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
+      @Override public boolean evaluate() {
+        return reader.getLastReadPosition() >= eof;
+      }
+    });
+    assertNull(reader.poll(0));
+
+    log.rollWriter();
+
+    // should get empty batch with current wal position, after wal rolled
+    WALEntryBatch entryBatch = reader.take();
+
+    Path lastWAL= walQueue.peek();
+    long positionToBeLogged = getPosition(lastWAL);
+
+    assertNotNull(entryBatch);
+    assertTrue(entryBatch.isEmpty());
+    assertEquals(1, walQueue.size());
+    assertNotEquals(firstWAL, entryBatch.getLastWalPath());
+    assertEquals(lastWAL, entryBatch.getLastWalPath());
+    assertEquals(positionToBeLogged, entryBatch.getLastWalPosition());
+  }
+
+  private long getPosition(Path walPath) throws IOException {
+    WALEntryStream entryStream =
+            new WALEntryStream(new 
PriorityBlockingQueue<>(Collections.singletonList(walPath)),
+                    fs, conf, new MetricsSource("1"));
+    entryStream.hasNext();
+    return entryStream.getPosition();
+  }
+
   private String getRow(WAL.Entry entry) {
     Cell cell = entry.getEdit().getCells().get(0);
     return Bytes.toString(cell.getRowArray(), cell.getRowOffset(), 
cell.getRowLength());
@@ -459,17 +551,25 @@ public class TestWALEntryStream {
   }
 
   private void appendToLogPlus(int count) throws IOException {
+    appendToLogPlus(count, family, qualifier);
+  }
+
+  private void appendToLogPlus(int count, byte[] cf) throws IOException {
+    appendToLogPlus(count, cf, qualifier);
+  }
+
+  private void appendToLogPlus(int count, byte[] cf, byte[] cq) throws 
IOException {
     final long txid = log.append(htd, info,
       new WALKey(info.getEncodedNameAsBytes(), tableName, 
System.currentTimeMillis(), mvcc),
-      getWALEdits(count), true);
+      getWALEdits(count, cf, cq), true);
     log.sync(txid);
   }
 
-  private WALEdit getWALEdits(int count) {
+  private WALEdit getWALEdits(int count, byte[] cf, byte[] cq) {
     WALEdit edit = new WALEdit();
     for (int i = 0; i < count; i++) {
-      edit.add(new KeyValue(Bytes.toBytes(System.currentTimeMillis()), family, 
qualifier,
-          System.currentTimeMillis(), qualifier));
+      edit.add(new KeyValue(Bytes.toBytes(System.currentTimeMillis()), cf, cq,
+          System.currentTimeMillis(), cq));
     }
     return edit;
   }
@@ -491,8 +591,16 @@ public class TestWALEntryStream {
     };
   }
 
+  private ReplicationQueueInfo getRecoveredQueueInfo() {
+    return getQueueInfo("1-1");
+  }
+
   private ReplicationQueueInfo getQueueInfo() {
-    return new ReplicationQueueInfo("1");
+    return getQueueInfo("1");
+  }
+
+  private ReplicationQueueInfo getQueueInfo(String znode) {
+    return new ReplicationQueueInfo(znode);
   }
 
   class PathWatcher extends WALActionsListener.Base {
@@ -505,5 +613,4 @@ public class TestWALEntryStream {
       currentPath = newPath;
     }
   }
-
 }

Reply via email to