This is an automated email from the ASF dual-hosted git repository.
wchevreuil pushed a commit to branch branch-1
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-1 by this push:
new e0c419d HBASE-23205 Correctly update the position of WALs currently
being replicated (#944)
e0c419d is described below
commit e0c419d76d5a97f34500ec60795583a8a2840119
Author: Jeongdae Kim <[email protected]>
AuthorDate: Thu Jan 2 20:56:38 2020 +0900
HBASE-23205 Correctly update the position of WALs currently being
replicated (#944)
Signed-off-by: Wellington Chevreuil <[email protected]>
---
.../regionserver/ReplicationSource.java | 64 ++++--
.../regionserver/ReplicationSourceManager.java | 20 +-
.../ReplicationSourceWALReaderThread.java | 138 +++++++-----
.../hbase/replication/TestReplicationSource.java | 248 +++++++++++++++++++--
.../regionserver/TestWALEntryStream.java | 181 ++++++++++++---
5 files changed, 500 insertions(+), 151 deletions(-)
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 124da63..5acb709 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -18,6 +18,7 @@
*/
package org.apache.hadoop.hbase.replication.regionserver;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Service;
@@ -439,14 +440,30 @@ public class ReplicationSource extends Thread implements
ReplicationSourceInterf
}
@Override
+ @VisibleForTesting
public Path getCurrentPath() {
- // only for testing
for (ReplicationSourceShipperThread worker : workerThreads.values()) {
if (worker.getCurrentPath() != null) return worker.getCurrentPath();
}
return null;
}
+ @VisibleForTesting
+ public Path getLastLoggedPath() {
+ for (ReplicationSourceShipperThread worker : workerThreads.values()) {
+ return worker.getLastLoggedPath();
+ }
+ return null;
+ }
+
+ @VisibleForTesting
+ public long getLastLoggedPosition() {
+ for (ReplicationSourceShipperThread worker : workerThreads.values()) {
+ return worker.getLastLoggedPosition();
+ }
+ return 0;
+ }
+
private boolean isSourceActive() {
return !this.stopper.isStopped() && this.sourceRunning;
}
@@ -481,8 +498,8 @@ public class ReplicationSource extends Thread implements
ReplicationSourceInterf
for (Map.Entry<String, ReplicationSourceShipperThread> entry :
workerThreads.entrySet()) {
String walGroupId = entry.getKey();
ReplicationSourceShipperThread worker = entry.getValue();
- long position = worker.getCurrentPosition();
- Path currentPath = worker.getCurrentPath();
+ long position = worker.getLastLoggedPosition();
+ Path currentPath = worker.getLastLoggedPath();
sb.append("walGroup [").append(walGroupId).append("]: ");
if (currentPath != null) {
sb.append("currently replicating from: ").append(currentPath).append("
at position: ")
@@ -517,7 +534,7 @@ public class ReplicationSource extends Thread implements
ReplicationSourceInterf
int queueSize = queues.get(walGroupId).size();
replicationDelay =
ReplicationLoad.calculateReplicationDelay(ageOfLastShippedOp,
lastTimeStamp, queueSize);
- Path currentPath = worker.getCurrentPath();
+ Path currentPath = worker.getLastLoggedPath();
fileSize = -1;
if (currentPath != null) {
try {
@@ -535,7 +552,7 @@ public class ReplicationSource extends Thread implements
ReplicationSourceInterf
.withQueueSize(queueSize)
.withWalGroup(walGroupId)
.withCurrentPath(currentPath)
- .withCurrentPosition(worker.getCurrentPosition())
+ .withCurrentPosition(worker.getLastLoggedPosition())
.withFileSize(fileSize)
.withAgeOfLastShippedOp(ageOfLastShippedOp)
.withReplicationDelay(replicationDelay);
@@ -555,7 +572,7 @@ public class ReplicationSource extends Thread implements
ReplicationSourceInterf
// Last position in the log that we sent to ZooKeeper
private long lastLoggedPosition = -1;
// Path of the current log
- private volatile Path currentPath;
+ private volatile Path lastLoggedPath;
// Current state of the worker thread
private WorkerState state;
ReplicationSourceWALReaderThread entryReader;
@@ -600,13 +617,11 @@ public class ReplicationSource extends Thread implements
ReplicationSourceInterf
WALEntryBatch entryBatch = entryReader.take();
shipEdits(entryBatch);
releaseBufferQuota((int) entryBatch.getHeapSize());
- if (replicationQueueInfo.isQueueRecovered() &&
entryBatch.getWalEntries().isEmpty()
- && entryBatch.getLastSeqIds().isEmpty()) {
- LOG.debug("Finished recovering queue for group " + walGroupId + "
of peer "
- + peerClusterZnode);
+ if (!entryBatch.hasMoreEntries()) {
+ LOG.debug("Finished recovering queue for group "
+ + walGroupId + " of peer " + peerClusterZnode);
metrics.incrCompletedRecoveryQueue();
setWorkerState(WorkerState.FINISHED);
- continue;
}
} catch (InterruptedException e) {
LOG.trace("Interrupted while waiting for next replication entry
batch", e);
@@ -614,7 +629,7 @@ public class ReplicationSource extends Thread implements
ReplicationSourceInterf
}
}
- if (replicationQueueInfo.isQueueRecovered() && getWorkerState() ==
WorkerState.FINISHED) {
+ if (getWorkerState() == WorkerState.FINISHED) {
// use synchronize to make sure one last thread will clean the queue
synchronized (this) {
Threads.sleep(100);// wait a short while for other worker thread to
fully exit
@@ -694,15 +709,13 @@ public class ReplicationSource extends Thread implements
ReplicationSourceInterf
protected void shipEdits(WALEntryBatch entryBatch) {
List<Entry> entries = entryBatch.getWalEntries();
long lastReadPosition = entryBatch.getLastWalPosition();
- currentPath = entryBatch.getLastWalPath();
+ lastLoggedPath = entryBatch.getLastWalPath();
int sleepMultiplier = 0;
if (entries.isEmpty()) {
- if (lastLoggedPosition != lastReadPosition) {
- updateLogPosition(lastReadPosition);
- // if there was nothing to ship and it's not an error
- // set "ageOfLastShippedOp" to <now> to indicate that we're current
- metrics.setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(),
walGroupId);
- }
+ updateLogPosition(lastReadPosition);
+ // if there was nothing to ship and it's not an error
+ // set "ageOfLastShippedOp" to <now> to indicate that we're current
+ metrics.setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(),
walGroupId);
return;
}
int currentSize = (int) entryBatch.getHeapSize();
@@ -787,8 +800,7 @@ public class ReplicationSource extends Thread implements
ReplicationSourceInterf
}
private void updateLogPosition(long lastReadPosition) {
- manager.setPendingShipment(false);
- manager.logPositionAndCleanOldLogs(currentPath, peerClusterZnode,
lastReadPosition,
+ manager.logPositionAndCleanOldLogs(lastLoggedPath, peerClusterZnode,
lastReadPosition,
this.replicationQueueInfo.isQueueRecovered(), false);
lastLoggedPosition = lastReadPosition;
}
@@ -800,7 +812,7 @@ public class ReplicationSource extends Thread implements
ReplicationSourceInterf
public void uncaughtException(final Thread t, final Throwable e) {
RSRpcServices.exitIfOOME(e);
LOG.error("Unexpected exception in ReplicationSourceWorkerThread," +
" currentPath="
- + getCurrentPath(), e);
+ + getLastLoggedPath(), e);
stopper.stop("Unexpected exception in
ReplicationSourceWorkerThread");
}
};
@@ -941,8 +953,12 @@ public class ReplicationSource extends Thread implements
ReplicationSourceInterf
return this.entryReader.getCurrentPath();
}
- public long getCurrentPosition() {
- return this.lastLoggedPosition;
+ public Path getLastLoggedPath() {
+ return lastLoggedPath;
+ }
+
+ public long getLastLoggedPosition() {
+ return lastLoggedPosition;
}
private boolean isWorkerActive() {
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 6b8b6e2..0e3724a 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -123,8 +123,6 @@ public class ReplicationSourceManager implements
ReplicationListener {
private AtomicLong totalBufferUsed = new AtomicLong();
- private boolean pendingShipment;
-
/**
* Creates a replication manager and sets the watch on all the other
registered region servers
* @param replicationQueues the interface for manipulating replication queues
@@ -191,19 +189,13 @@ public class ReplicationSourceManager implements
ReplicationListener {
* @param holdLogInZK if true then the log is retained in ZK
*/
public synchronized void logPositionAndCleanOldLogs(Path log, String id,
long position,
- boolean queueRecovered, boolean holdLogInZK) {
- if (!this.pendingShipment) {
- String fileName = log.getName();
- this.replicationQueues.setLogPosition(id, fileName, position);
- if (holdLogInZK) {
- return;
- }
- cleanOldLogs(fileName, id, queueRecovered);
+ boolean queueRecovered, boolean holdLogInZK) {
+ String fileName = log.getName();
+ this.replicationQueues.setLogPosition(id, fileName, position);
+ if (holdLogInZK) {
+ return;
}
- }
-
- public synchronized void setPendingShipment(boolean pendingShipment) {
- this.pendingShipment = pendingShipment;
+ cleanOldLogs(fileName, id, queueRecovered);
}
/**
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
index 1d94a7a..5855574 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
@@ -21,9 +21,7 @@ package org.apache.hadoop.hbase.replication.regionserver;
import java.io.EOFException;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
@@ -69,7 +67,8 @@ public class ReplicationSourceWALReaderThread extends Thread {
// max count of each batch - multiply by number of batches in queue to get
total
private int replicationBatchCountCapacity;
// position in the WAL to start reading at
- private long currentPosition;
+ private long lastReadPosition;
+ private Path lastReadPath;
private WALEntryFilter filter;
private long sleepForRetries;
//Indicates whether this particular worker is running
@@ -81,8 +80,6 @@ public class ReplicationSourceWALReaderThread extends Thread {
private AtomicLong totalBufferUsed;
private long totalBufferQuota;
- private ReplicationSourceManager replicationSourceManager;
-
/**
* Creates a reader worker for a given WAL queue. Reads WAL entries off a
given queue, batches the
* entries, and puts them on a batch queue.
@@ -101,7 +98,8 @@ public class ReplicationSourceWALReaderThread extends Thread
{
FileSystem fs, Configuration conf, WALEntryFilter filter, MetricsSource
metrics) {
this.replicationQueueInfo = replicationQueueInfo;
this.logQueue = logQueue;
- this.currentPosition = startPosition;
+ this.lastReadPath = logQueue.peek();
+ this.lastReadPosition = startPosition;
this.fs = fs;
this.conf = conf;
this.filter = filter;
@@ -111,7 +109,6 @@ public class ReplicationSourceWALReaderThread extends
Thread {
// memory used will be batchSizeCapacity * (nb.batches + 1)
// the +1 is for the current thread reading before placing onto the queue
int batchCount = conf.getInt("replication.source.nb.batches", 1);
- this.replicationSourceManager = manager;
this.totalBufferUsed = manager.getTotalBufferUsed();
this.totalBufferQuota =
conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY,
HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
@@ -133,61 +130,45 @@ public class ReplicationSourceWALReaderThread extends
Thread {
int sleepMultiplier = 1;
while (isReaderRunning()) { // we only loop back here if something fatal
happened to our stream
try (WALEntryStream entryStream =
- new WALEntryStream(logQueue, fs, conf, currentPosition, metrics)) {
+ new WALEntryStream(logQueue, fs, conf, lastReadPosition, metrics)) {
while (isReaderRunning()) { // loop here to keep reusing stream while
we can
if (!checkQuota()) {
continue;
}
- WALEntryBatch batch = null;
- while (entryStream.hasNext()) {
- if (batch == null) {
- batch = new WALEntryBatch(replicationBatchCountCapacity,
entryStream.getCurrentPath());
- }
+ WALEntryBatch batch = new
WALEntryBatch(replicationBatchCountCapacity);
+ boolean hasNext;
+ while ((hasNext = entryStream.hasNext()) == true) {
Entry entry = entryStream.next();
entry = filterEntry(entry);
if (entry != null) {
WALEdit edit = entry.getEdit();
if (edit != null && !edit.isEmpty()) {
long entrySize = getEntrySizeIncludeBulkLoad(entry);
- long entrySizeExlucdeBulkLoad =
getEntrySizeExcludeBulkLoad(entry);
+ long entrySizeExcludeBulkLoad =
getEntrySizeExcludeBulkLoad(entry);
batch.addEntry(entry);
- replicationSourceManager.setPendingShipment(true);
updateBatchStats(batch, entry, entryStream.getPosition(),
entrySize);
- boolean totalBufferTooLarge =
acquireBufferQuota(entrySizeExlucdeBulkLoad);
+ boolean totalBufferTooLarge =
acquireBufferQuota(entrySizeExcludeBulkLoad);
// Stop if too many entries or too big
if (totalBufferTooLarge || batch.getHeapSize() >=
replicationBatchSizeCapacity
|| batch.getNbEntries() >= replicationBatchCountCapacity) {
break;
}
}
- } else {
-
replicationSourceManager.logPositionAndCleanOldLogs(entryStream.getCurrentPath(),
- this.replicationQueueInfo.getPeerClusterZnode(),
- entryStream.getPosition(),
- this.replicationQueueInfo.isQueueRecovered(), false);
}
}
- if (batch != null && (!batch.getLastSeqIds().isEmpty() ||
batch.getNbEntries() > 0)) {
- if (LOG.isTraceEnabled()) {
- LOG.trace(String.format("Read %s WAL entries eligible for
replication",
- batch.getNbEntries()));
- }
- entryBatchQueue.put(batch);
+
+ updateBatch(entryStream, batch, hasNext);
+ if (isShippable(batch)) {
sleepMultiplier = 1;
- } else { // got no entries and didn't advance position in WAL
- LOG.trace("Didn't read any new entries from WAL");
- if (replicationQueueInfo.isQueueRecovered()) {
- // we're done with queue recovery, shut ourself down
+ entryBatchQueue.put(batch);
+ if (!batch.hasMoreEntries()) {
+ // we're done with queue recovery, shut ourselves down
setReaderRunning(false);
- // shuts down shipper thread immediately
- entryBatchQueue.put(batch != null ? batch
- : new WALEntryBatch(replicationBatchCountCapacity,
entryStream.getCurrentPath()));
- } else {
- Thread.sleep(sleepForRetries);
}
+ } else {
+ Thread.sleep(sleepForRetries);
}
- currentPosition = entryStream.getPosition();
- entryStream.reset(); // reuse stream
+ resetStream(entryStream);
}
} catch (IOException | WALEntryStreamRuntimeException e) { // stream
related
if (sleepMultiplier < maxRetriesMultiplier) {
@@ -205,6 +186,38 @@ public class ReplicationSourceWALReaderThread extends
Thread {
}
}
+ private void updateBatch(WALEntryStream entryStream, WALEntryBatch batch,
boolean moreData) {
+ logMessage(batch);
+ batch.updatePosition(entryStream);
+ batch.setMoreEntries(!replicationQueueInfo.isQueueRecovered() || moreData);
+ }
+
+ private void logMessage(WALEntryBatch batch) {
+ if (LOG.isTraceEnabled()) {
+ if (batch.isEmpty()) {
+ LOG.trace("Didn't read any new entries from WAL");
+ } else {
+ LOG.trace(String.format("Read %s WAL entries eligible for replication",
+ batch.getNbEntries()));
+ }
+ }
+ }
+
+ private boolean isShippable(WALEntryBatch batch) {
+ return !batch.isEmpty() || checkIfWALRolled(batch) ||
!batch.hasMoreEntries();
+ }
+
+ private boolean checkIfWALRolled(WALEntryBatch batch) {
+ return lastReadPath == null && batch.lastWalPath != null
+ || lastReadPath != null && !lastReadPath.equals(batch.lastWalPath);
+ }
+
+ private void resetStream(WALEntryStream stream) throws IOException {
+ lastReadPosition = stream.getPosition();
+ lastReadPath = stream.getCurrentPath();
+ stream.reset(); // reuse stream
+ }
+
// if we get an EOF due to a zero-length log, and there are other logs in
queue
// (highly likely we've closed the current log), we've hit the max retries,
and autorecovery is
// enabled, then dump the log
@@ -214,8 +227,8 @@ public class ReplicationSourceWALReaderThread extends
Thread {
try {
if (fs.getFileStatus(logQueue.peek()).getLen() == 0) {
LOG.warn("Forcing removal of 0 length log in queue: " +
logQueue.peek());
- logQueue.remove();
- currentPosition = 0;
+ lastReadPath = logQueue.remove();
+ lastReadPosition = 0;
}
} catch (IOException ioe) {
LOG.warn("Couldn't get file length information about log " +
logQueue.peek());
@@ -224,12 +237,6 @@ public class ReplicationSourceWALReaderThread extends
Thread {
}
public Path getCurrentPath() {
- // if we've read some WAL entries, get the Path we read from
- WALEntryBatch batchQueueHead = entryBatchQueue.peek();
- if (batchQueueHead != null) {
- return batchQueueHead.lastWalPath;
- }
- // otherwise, we must be currently reading from the head of the log queue
return logQueue.peek();
}
@@ -380,6 +387,10 @@ public class ReplicationSourceWALReaderThread extends
Thread {
this.isReaderRunning = readerRunning;
}
+ public long getLastReadPosition() {
+ return this.lastReadPosition;
+ }
+
/**
* Holds a batch of WAL entries to replicate, along with some statistics
*
@@ -396,17 +407,14 @@ public class ReplicationSourceWALReaderThread extends
Thread {
private int nbHFiles = 0;
// heap size of data we need to replicate
private long heapSize = 0;
- // save the last sequenceid for each region if the table has
serial-replication scope
- private Map<String, Long> lastSeqIds = new HashMap<>();
+ // whether more entries to read exist in WALs or not
+ private boolean moreEntries = true;
/**
- * @param walEntries
- * @param lastWalPath Path of the WAL the last entry in this batch was
read from
- * @param lastWalPosition Position in the WAL the last entry in this batch
was read from
+ * @param maxNbEntries the number of entries a batch can have
*/
- private WALEntryBatch(int maxNbEntries, Path lastWalPath) {
+ private WALEntryBatch(int maxNbEntries) {
this.walEntries = new ArrayList<>(maxNbEntries);
- this.lastWalPath = lastWalPath;
}
public void addEntry(Entry entry) {
@@ -466,13 +474,6 @@ public class ReplicationSourceWALReaderThread extends
Thread {
return heapSize;
}
- /**
- * @return the last sequenceid for each region if the table has
serial-replication scope
- */
- public Map<String, Long> getLastSeqIds() {
- return lastSeqIds;
- }
-
private void incrementNbRowKeys(int increment) {
nbRowKeys += increment;
}
@@ -484,5 +485,22 @@ public class ReplicationSourceWALReaderThread extends
Thread {
private void incrementHeapSize(long increment) {
heapSize += increment;
}
+
+ public boolean isEmpty() {
+ return walEntries.isEmpty();
+ }
+
+ public void updatePosition(WALEntryStream entryStream) {
+ lastWalPath = entryStream.getCurrentPath();
+ lastWalPosition = entryStream.getPosition();
+ }
+
+ public boolean hasMoreEntries() {
+ return moreEntries;
+ }
+
+ public void setMoreEntries(boolean moreEntries) {
+ this.moreEntries = moreEntries;
+ }
}
}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
index 5d9059a..b4ac71b 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
@@ -16,13 +16,30 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.hadoop.hbase.replication;
+import static
org.apache.hadoop.hbase.replication.TestReplicationEndpoint.ReplicationEndpointForTest;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.mockito.internal.verification.VerificationModeFactory.times;
import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -34,36 +51,38 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CoordinatedStateManager;
-import org.apache.hadoop.hbase.MiniHBaseCluster;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.Waiter;
-import org.apache.hadoop.hbase.Waiter.Predicate;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.Waiter.Predicate;
import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import
org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
+import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
import org.apache.hadoop.hbase.replication.regionserver.Replication;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource;
import
org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL;
-import org.apache.hadoop.hbase.wal.WALProvider;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKey;
-import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
-import
org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
-import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.wal.WALProvider;
+import org.junit.After;
import org.junit.AfterClass;
+import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
-import static org.mockito.Mockito.mock;
-
@Category(MediumTests.class)
public class TestReplicationSource {
@@ -92,6 +111,32 @@ public class TestReplicationSource {
if (FS.exists(logDir)) FS.delete(logDir, true);
}
+ @Before
+ public void setup() throws IOException {
+ if (!FS.exists(logDir)) {
+ FS.mkdirs(logDir);
+ }
+ if (!FS.exists(oldLogDir)) {
+ FS.mkdirs(oldLogDir);
+ }
+
+ ReplicationEndpointForTest.contructedCount.set(0);
+ ReplicationEndpointForTest.startedCount.set(0);
+ ReplicationEndpointForTest.replicateCount.set(0);
+ ReplicationEndpointForTest.stoppedCount.set(0);
+ ReplicationEndpointForTest.lastEntries = null;
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ if (FS.exists(oldLogDir)) {
+ FS.delete(oldLogDir, true);
+ }
+ if (FS.exists(logDir)) {
+ FS.delete(logDir, true);
+ }
+ }
+
@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL_PEER.shutdownMiniHBaseCluster();
@@ -108,8 +153,6 @@ public class TestReplicationSource {
@Test
public void testLogMoving() throws Exception{
Path logPath = new Path(logDir, "log");
- if (!FS.exists(logDir)) FS.mkdirs(logDir);
- if (!FS.exists(oldLogDir)) FS.mkdirs(oldLogDir);
WALProvider.Writer writer = WALFactory.createWALWriter(FS, logPath,
TEST_UTIL.getConfiguration());
for(int i = 0; i < 3; i++) {
@@ -166,7 +209,6 @@ public class TestReplicationSource {
Configuration testConf = HBaseConfiguration.create();
testConf.setInt("replication.source.maxretriesmultiplier", 1);
ReplicationSourceManager manager =
Mockito.mock(ReplicationSourceManager.class);
- Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
source.init(testConf, null, manager, null, mockPeers, null, "testPeer",
null, replicationEndpoint, null);
ExecutorService executor = Executors.newSingleThreadExecutor();
@@ -189,10 +231,184 @@ public class TestReplicationSource {
}
+ private void appendEntries(WALProvider.Writer writer, int numEntries) throws
IOException {
+ for (int i = 0; i < numEntries; i++) {
+ byte[] b = Bytes.toBytes(Integer.toString(i));
+ KeyValue kv = new KeyValue(b,b,b);
+ WALEdit edit = new WALEdit();
+ edit.add(kv);
+ WALKey key = new WALKey(b, TableName.valueOf(b), 0, 0,
+ HConstants.DEFAULT_CLUSTER_ID);
+ NavigableMap<byte[], Integer> scopes = new TreeMap<byte[],
Integer>(Bytes.BYTES_COMPARATOR);
+ scopes.put(b, HConstants.REPLICATION_SCOPE_GLOBAL);
+ key.setScopes(scopes);
+ writer.append(new WAL.Entry(key, edit));
+ writer.sync(false);
+ }
+ writer.close();
+ }
+
+ private long getPosition(WALFactory wals, Path log2, int numEntries) throws
IOException {
+ WAL.Reader reader = wals.createReader(FS, log2);
+ for (int i = 0; i < numEntries; i++) {
+ reader.next();
+ }
+ return reader.getPosition();
+ }
+
+ private static final class Mocks {
+ private final ReplicationSourceManager manager =
mock(ReplicationSourceManager.class);
+ private final ReplicationQueues queues = mock(ReplicationQueues.class);
+ private final ReplicationPeers peers = mock(ReplicationPeers.class);
+ private final MetricsSource metrics = mock(MetricsSource.class);
+ private final ReplicationPeer peer = mock(ReplicationPeer.class);
+ private final ReplicationEndpoint.Context context =
mock(ReplicationEndpoint.Context.class);
+
+ private Mocks() {
+ when(peers.getStatusOfPeer(anyString())).thenReturn(true);
+ when(context.getReplicationPeer()).thenReturn(peer);
+ when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
+ }
+
+ ReplicationSource createReplicationSourceWithMocks(ReplicationEndpoint
endpoint)
+ throws IOException {
+ final ReplicationSource source = new ReplicationSource();
+ endpoint.init(context);
+ source.init(conf, FS, manager, queues, peers, mock(Stoppable.class),
+ "testPeerClusterZnode", UUID.randomUUID(), endpoint, metrics);
+ return source;
+ }
+ }
+
+ @Test
+ public void testSetLogPositionForWALCurrentlyReadingWhenLogsRolled() throws
Exception {
+ final int numWALEntries = 5;
+ conf.setInt("replication.source.nb.capacity", numWALEntries);
+
+ Mocks mocks = new Mocks();
+ final ReplicationEndpointForTest endpoint = new
ReplicationEndpointForTest() {
+ @Override
+ public WALEntryFilter getWALEntryfilter() {
+ return null;
+ }
+ };
+ WALFactory wals = new WALFactory(TEST_UTIL.getConfiguration(), null,
"test");
+ final Path log1 = new Path(logDir, "log.1");
+ final Path log2 = new Path(logDir, "log.2");
+
+ WALProvider.Writer writer1 = WALFactory.createWALWriter(FS, log1,
TEST_UTIL.getConfiguration());
+ WALProvider.Writer writer2 = WALFactory.createWALWriter(FS, log2,
TEST_UTIL.getConfiguration());
+
+ appendEntries(writer1, 3);
+ appendEntries(writer2, 2);
+
+ long pos = getPosition(wals, log2, 2);
+
+ final ReplicationSource source =
mocks.createReplicationSourceWithMocks(endpoint);
+ source.run();
+
+ source.enqueueLog(log1);
+ // log rolled
+ source.enqueueLog(log2);
+
+ Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
+ @Override public boolean evaluate() throws Exception {
+ return endpoint.replicateCount.get() > 0;
+ }
+ });
+
+ ArgumentCaptor<Path> pathCaptor = ArgumentCaptor.forClass(Path.class);
+ ArgumentCaptor<Long> positionCaptor = ArgumentCaptor.forClass(Long.class);
+ verify(mocks.manager, times(1))
+ .logPositionAndCleanOldLogs(pathCaptor.capture(), anyString(),
positionCaptor.capture(),
+ anyBoolean(), anyBoolean());
+ assertTrue(endpoint.lastEntries.size() == 5);
+ assertThat(pathCaptor.getValue(), is(log2));
+ assertThat(positionCaptor.getValue(), is(pos));
+ }
+
+ @Test
+ public void testSetLogPositionAndRemoveOldWALsEvenIfEmptyWALsRolled() throws
Exception {
+ Mocks mocks = new Mocks();
+
+ final ReplicationEndpointForTest endpoint = new
ReplicationEndpointForTest();
+ final ReplicationSource source =
mocks.createReplicationSourceWithMocks(endpoint);
+ WALFactory wals = new WALFactory(TEST_UTIL.getConfiguration(), null,
"test");
+
+ final Path log1 = new Path(logDir, "log.1");
+ final Path log2 = new Path(logDir, "log.2");
+
+ WALFactory.createWALWriter(FS, log1, TEST_UTIL.getConfiguration()).close();
+ WALFactory.createWALWriter(FS, log2, TEST_UTIL.getConfiguration()).close();
+ final long startPos = getPosition(wals, log2, 0);
+
+ source.run();
+ source.enqueueLog(log1);
+ source.enqueueLog(log2);
+
+ Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
+ @Override public boolean evaluate() throws Exception {
+ return log2.equals(source.getLastLoggedPath())
+ && source.getLastLoggedPosition() >= startPos;
+ }
+ });
+
+ ArgumentCaptor<Path> pathCaptor = ArgumentCaptor.forClass(Path.class);
+ ArgumentCaptor<Long> positionCaptor = ArgumentCaptor.forClass(Long.class);
+
+ verify(mocks.manager, times(1))
+ .logPositionAndCleanOldLogs(pathCaptor.capture(), anyString(),
positionCaptor.capture(),
+ anyBoolean(), anyBoolean());
+ assertThat(pathCaptor.getValue(), is(log2));
+ assertThat(positionCaptor.getValue(), is(startPos));
+ }
+
+ @Test
+ public void testSetLogPositionAndRemoveOldWALsEvenIfNoCfsReplicated() throws
Exception {
+ Mocks mocks = new Mocks();
+ // set table cfs to filter all cells out
+ final TableName replicatedTable = TableName.valueOf("replicated_table");
+ final Map<TableName, List<String>> cfs =
+ Collections.singletonMap(replicatedTable,
Collections.<String>emptyList());
+ when(mocks.peer.getTableCFs()).thenReturn(cfs);
+
+ WALFactory wals = new WALFactory(TEST_UTIL.getConfiguration(), null,
"test");
+ final Path log1 = new Path(logDir, "log.1");
+ final Path log2 = new Path(logDir, "log.2");
+
+ WALProvider.Writer writer1 = WALFactory.createWALWriter(FS, log1,
TEST_UTIL.getConfiguration());
+ WALProvider.Writer writer2 = WALFactory.createWALWriter(FS, log2,
TEST_UTIL.getConfiguration());
+
+ appendEntries(writer1, 3);
+ appendEntries(writer2, 2);
+ final long pos = getPosition(wals, log2, 2);
+
+ final ReplicationEndpointForTest endpoint = new
ReplicationEndpointForTest();
+ final ReplicationSource source =
mocks.createReplicationSourceWithMocks(endpoint);
+ source.enqueueLog(log1);
+ source.enqueueLog(log2);
+ source.run();
+ Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
+ @Override public boolean evaluate() throws Exception {
+ // wait until reader read all cells
+ return log2.equals(source.getLastLoggedPath()) &&
source.getLastLoggedPosition() >= pos;
+ }
+ });
+
+ ArgumentCaptor<Path> pathCaptor = ArgumentCaptor.forClass(Path.class);
+ ArgumentCaptor<Long> positionCaptor = ArgumentCaptor.forClass(Long.class);
+
+ // all old wals should be removed by updating wal position, even if all
cells are filtered out.
+ verify(mocks.manager, times(1))
+ .logPositionAndCleanOldLogs(pathCaptor.capture(), anyString(),
positionCaptor.capture(),
+ anyBoolean(), anyBoolean());
+ assertThat(pathCaptor.getValue(), is(log2));
+ assertThat(positionCaptor.getValue(), is(pos));
+ }
+
/**
* Tests that recovered queues are preserved on a regionserver shutdown.
* See HBASE-18192
- * @throws Exception
*/
@Test
public void testServerShutdownRecoveredQueue() throws Exception {
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
index 9f077da..7ad7260 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
@@ -23,17 +23,15 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyBoolean;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -47,17 +45,22 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
+import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
+import org.apache.hadoop.hbase.replication.TableCfWALEntryFilter;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
import
org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReaderThread.WALEntryBatch;
import org.apache.hadoop.hbase.testclassification.LargeTests;
@@ -77,7 +80,6 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
-import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.mockito.runners.MockitoJUnitRunner;
@@ -358,8 +360,9 @@ public class TestWALEntryStream {
// start up a batcher
ReplicationSourceManager mockSourceManager =
Mockito.mock(ReplicationSourceManager.class);
when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
- ReplicationSourceWALReaderThread batcher = new
ReplicationSourceWALReaderThread(mockSourceManager, getQueueInfo(),walQueue, 0,
- fs, conf, getDummyFilter(), new MetricsSource("1"));
+ ReplicationSourceWALReaderThread batcher =
+ new ReplicationSourceWALReaderThread(mockSourceManager,
getQueueInfo(),walQueue, 0,
+ fs, conf, getDummyFilter(), new MetricsSource("1"));
Path walPath = walQueue.peek();
batcher.start();
WALEntryBatch entryBatch = batcher.take();
@@ -378,37 +381,36 @@ public class TestWALEntryStream {
}
@Test
- public void testReplicationSourceUpdatesLogPositionOnFilteredEntries()
throws Exception {
+ public void testReplicationSourceWALReaderThreadRecoveredQueue() throws
Exception {
appendEntriesToLog(3);
- // get ending position
+ log.rollWriter();
+ appendEntriesToLog(2);
+
long position;
- try (WALEntryStream entryStream =
- new WALEntryStream(walQueue, fs, conf, new MetricsSource("1"))) {
+ try (WALEntryStream entryStream = new WALEntryStream(new
PriorityBlockingQueue<>(walQueue),
+ fs, conf, new MetricsSource("1"))) {
+ entryStream.next();
+ entryStream.next();
entryStream.next();
entryStream.next();
entryStream.next();
position = entryStream.getPosition();
}
- // start up a readerThread with a WALEntryFilter that always filter the
entries
- ReplicationSourceManager mockSourceManager =
Mockito.mock(ReplicationSourceManager.class);
+
+ ReplicationSourceManager mockSourceManager =
mock(ReplicationSourceManager.class);
when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
- ReplicationSourceWALReaderThread readerThread = new
ReplicationSourceWALReaderThread(
- mockSourceManager, getQueueInfo(), walQueue, 0, fs, conf, new
WALEntryFilter() {
- @Override
- public Entry filter(Entry entry) {
- return null;
- }
- }, new MetricsSource("1"));
- readerThread.start();
- Thread.sleep(100);
- ArgumentCaptor<Long> positionCaptor = ArgumentCaptor.forClass(Long.class);
- verify(mockSourceManager, times(3))
- .logPositionAndCleanOldLogs(any(Path.class),
- anyString(),
- positionCaptor.capture(),
- anyBoolean(),
- anyBoolean());
- assertEquals(position, positionCaptor.getValue().longValue());
+ ReplicationSourceWALReaderThread reader =
+ new ReplicationSourceWALReaderThread(mockSourceManager,
getRecoveredQueueInfo(),
+ walQueue, 0, fs, conf, getDummyFilter(), new
MetricsSource("1"));
+ Path walPath = walQueue.toArray(new Path[2])[1];
+ reader.start();
+ WALEntryBatch entryBatch = reader.take();
+
+ assertNotNull(entryBatch);
+ assertEquals(5, entryBatch.getWalEntries().size());
+ assertEquals(position, entryBatch.getLastWalPosition());
+ assertEquals(walPath, entryBatch.getLastWalPath());
+ assertFalse(entryBatch.hasMoreEntries());
}
@Test
@@ -436,6 +438,96 @@ public class TestWALEntryStream {
}
}
+ @Test
+ public void testReplicationSourceWALReaderThreadWithFilter() throws
Exception {
+ final byte[] notReplicatedCf = Bytes.toBytes("notReplicated");
+ final Map<TableName, List<String>> tableCfs = new HashMap<>();
+ tableCfs.put(tableName, Collections.singletonList(Bytes.toString(family)));
+ ReplicationPeer peer = mock(ReplicationPeer.class);
+ when(peer.getTableCFs()).thenReturn(tableCfs);
+ WALEntryFilter filter = new ChainWALEntryFilter(new
TableCfWALEntryFilter(peer));
+
+ // add filterable entries
+ appendToLogPlus(3, notReplicatedCf);
+ appendToLogPlus(3, notReplicatedCf);
+ appendToLogPlus(3, notReplicatedCf);
+
+ // add non filterable entries
+ appendEntriesToLog(2);
+
+ ReplicationSourceManager mockSourceManager =
mock(ReplicationSourceManager.class);
+ when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
+ final ReplicationSourceWALReaderThread reader =
+ new ReplicationSourceWALReaderThread(mockSourceManager,
getQueueInfo(), walQueue,
+ 0, fs, conf, filter, new MetricsSource("1"));
+ reader.start();
+
+ WALEntryBatch entryBatch = reader.take();
+
+ assertNotNull(entryBatch);
+ assertFalse(entryBatch.isEmpty());
+ List<Entry> walEntries = entryBatch.getWalEntries();
+ assertEquals(2, walEntries.size());
+ for (Entry entry : walEntries) {
+ ArrayList<Cell> cells = entry.getEdit().getCells();
+ assertTrue(cells.size() == 1);
+ assertTrue(CellUtil.matchingFamily(cells.get(0), family));
+ }
+ }
+
+ @Test
+ public void testReplicationSourceWALReaderThreadWithFilterWhenLogRolled()
throws Exception {
+ final byte[] notReplicatedCf = Bytes.toBytes("notReplicated");
+ final Map<TableName, List<String>> tableCfs = new HashMap<>();
+ tableCfs.put(tableName, Collections.singletonList(Bytes.toString(family)));
+ ReplicationPeer peer = mock(ReplicationPeer.class);
+ when(peer.getTableCFs()).thenReturn(tableCfs);
+ WALEntryFilter filter = new ChainWALEntryFilter(new
TableCfWALEntryFilter(peer));
+
+ appendToLogPlus(3, notReplicatedCf);
+
+ Path firstWAL = walQueue.peek();
+ final long eof = getPosition(firstWAL);
+
+ ReplicationSourceManager mockSourceManager =
mock(ReplicationSourceManager.class);
+ when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
+ final ReplicationSourceWALReaderThread reader =
+ new ReplicationSourceWALReaderThread(mockSourceManager,
getQueueInfo(), walQueue,
+ 0, fs, conf, filter, new MetricsSource("1"));
+ reader.start();
+
+ // reader won't put any batch, even if EOF reached.
+ Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
+ @Override public boolean evaluate() {
+ return reader.getLastReadPosition() >= eof;
+ }
+ });
+ assertNull(reader.poll(0));
+
+ log.rollWriter();
+
+ // should get empty batch with current wal position, after wal rolled
+ WALEntryBatch entryBatch = reader.take();
+
+ Path lastWAL= walQueue.peek();
+ long positionToBeLogged = getPosition(lastWAL);
+
+ assertNotNull(entryBatch);
+ assertTrue(entryBatch.isEmpty());
+ assertEquals(1, walQueue.size());
+ assertNotEquals(firstWAL, entryBatch.getLastWalPath());
+ assertEquals(lastWAL, entryBatch.getLastWalPath());
+ assertEquals(positionToBeLogged, entryBatch.getLastWalPosition());
+ }
+
+ private long getPosition(Path walPath) throws IOException {
+ WALEntryStream entryStream =
+ new WALEntryStream(new
PriorityBlockingQueue<>(Collections.singletonList(walPath)),
+ fs, conf, new MetricsSource("1"));
+ entryStream.hasNext();
+ return entryStream.getPosition();
+ }
+
private String getRow(WAL.Entry entry) {
Cell cell = entry.getEdit().getCells().get(0);
return Bytes.toString(cell.getRowArray(), cell.getRowOffset(),
cell.getRowLength());
@@ -459,17 +551,25 @@ public class TestWALEntryStream {
}
private void appendToLogPlus(int count) throws IOException {
+ appendToLogPlus(count, family, qualifier);
+ }
+
+ private void appendToLogPlus(int count, byte[] cf) throws IOException {
+ appendToLogPlus(count, cf, qualifier);
+ }
+
+ private void appendToLogPlus(int count, byte[] cf, byte[] cq) throws
IOException {
final long txid = log.append(htd, info,
new WALKey(info.getEncodedNameAsBytes(), tableName,
System.currentTimeMillis(), mvcc),
- getWALEdits(count), true);
+ getWALEdits(count, cf, cq), true);
log.sync(txid);
}
- private WALEdit getWALEdits(int count) {
+ private WALEdit getWALEdits(int count, byte[] cf, byte[] cq) {
WALEdit edit = new WALEdit();
for (int i = 0; i < count; i++) {
- edit.add(new KeyValue(Bytes.toBytes(System.currentTimeMillis()), family,
qualifier,
- System.currentTimeMillis(), qualifier));
+ edit.add(new KeyValue(Bytes.toBytes(System.currentTimeMillis()), cf, cq,
+ System.currentTimeMillis(), cq));
}
return edit;
}
@@ -491,8 +591,16 @@ public class TestWALEntryStream {
};
}
+ private ReplicationQueueInfo getRecoveredQueueInfo() {
+ return getQueueInfo("1-1");
+ }
+
private ReplicationQueueInfo getQueueInfo() {
- return new ReplicationQueueInfo("1");
+ return getQueueInfo("1");
+ }
+
+ private ReplicationQueueInfo getQueueInfo(String znode) {
+ return new ReplicationQueueInfo(znode);
}
class PathWatcher extends WALActionsListener.Base {
@@ -505,5 +613,4 @@ public class TestWALEntryStream {
currentPath = newPath;
}
}
-
}