This is an automated email from the ASF dual-hosted git repository.
apurtell pushed a commit to branch branch-1
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-1 by this push:
new 9821fd8 HBASE-22784 OldWALs not cleared in a replication slave
cluster (cyclic replication bw 2 clusters)
9821fd8 is described below
commit 9821fd8ffb573696c07caa9a5ec3a656592dedee
Author: Wellington Chevreuil <[email protected]>
AuthorDate: Thu Aug 8 12:19:09 2019 +0100
HBASE-22784 OldWALs not cleared in a replication slave cluster (cyclic
replication bw 2 clusters)
Signed-off-by: Andrew Purtell <[email protected]>
---
.../regionserver/ReplicationSource.java | 1 +
.../regionserver/ReplicationSourceManager.java | 30 ++++++++++------
.../ReplicationSourceWALReaderThread.java | 9 +++++
.../regionserver/TestWALEntryStream.java | 40 ++++++++++++++++++++++
4 files changed, 70 insertions(+), 10 deletions(-)
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index ad941e0..c7780bb 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -783,6 +783,7 @@ public class ReplicationSource extends Thread implements
ReplicationSourceInterf
}
private void updateLogPosition(long lastReadPosition) {
+ manager.setPendingShipment(false);
manager.logPositionAndCleanOldLogs(currentPath, peerClusterZnode,
lastReadPosition,
this.replicationQueueInfo.isQueueRecovered(), false);
lastLoggedPosition = lastReadPosition;
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index dbe9e63..6b8b6e2 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -121,9 +121,10 @@ public class ReplicationSourceManager implements
ReplicationListener {
private final Random rand;
private final boolean replicationForBulkLoadDataEnabled;
-
private AtomicLong totalBufferUsed = new AtomicLong();
+ private boolean pendingShipment;
+
/**
* Creates a replication manager and sets the watch on all the other
registered region servers
* @param replicationQueues the interface for manipulating replication queues
@@ -189,14 +190,20 @@ public class ReplicationSourceManager implements
ReplicationListener {
* @param queueRecovered indicates if this queue comes from another region
server
* @param holdLogInZK if true then the log is retained in ZK
*/
- public void logPositionAndCleanOldLogs(Path log, String id, long position,
+ public synchronized void logPositionAndCleanOldLogs(Path log, String id,
long position,
boolean queueRecovered, boolean holdLogInZK) {
- String fileName = log.getName();
- this.replicationQueues.setLogPosition(id, fileName, position);
- if (holdLogInZK) {
- return;
+ if (!this.pendingShipment) {
+ String fileName = log.getName();
+ this.replicationQueues.setLogPosition(id, fileName, position);
+ if (holdLogInZK) {
+ return;
+ }
+ cleanOldLogs(fileName, id, queueRecovered);
}
- cleanOldLogs(fileName, id, queueRecovered);
+ }
+
+ public synchronized void setPendingShipment(boolean pendingShipment) {
+ this.pendingShipment = pendingShipment;
}
/**
@@ -209,9 +216,12 @@ public class ReplicationSourceManager implements
ReplicationListener {
public void cleanOldLogs(String key, String id, boolean queueRecovered) {
String logPrefix = DefaultWALProvider.getWALPrefixFromWALName(key);
if (queueRecovered) {
- SortedSet<String> wals = walsByIdRecoveredQueues.get(id).get(logPrefix);
- if (wals != null && !wals.first().equals(key)) {
- cleanOldLogs(wals, key, id);
+ Map<String, SortedSet<String>> walsForPeer =
walsByIdRecoveredQueues.get(id);
+ if(walsForPeer != null) {
+ SortedSet<String> wals = walsForPeer.get(logPrefix);
+ if (wals != null && !wals.first().equals(key)) {
+ cleanOldLogs(wals, key, id);
+ }
}
} else {
synchronized (this.walsById) {
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
index ec5e862..1d94a7a 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
@@ -81,6 +81,8 @@ public class ReplicationSourceWALReaderThread extends Thread {
private AtomicLong totalBufferUsed;
private long totalBufferQuota;
+ private ReplicationSourceManager replicationSourceManager;
+
/**
* Creates a reader worker for a given WAL queue. Reads WAL entries off a
given queue, batches the
* entries, and puts them on a batch queue.
@@ -109,6 +111,7 @@ public class ReplicationSourceWALReaderThread extends
Thread {
// memory used will be batchSizeCapacity * (nb.batches + 1)
// the +1 is for the current thread reading before placing onto the queue
int batchCount = conf.getInt("replication.source.nb.batches", 1);
+ this.replicationSourceManager = manager;
this.totalBufferUsed = manager.getTotalBufferUsed();
this.totalBufferQuota =
conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY,
HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
@@ -148,6 +151,7 @@ public class ReplicationSourceWALReaderThread extends
Thread {
long entrySize = getEntrySizeIncludeBulkLoad(entry);
long entrySizeExlucdeBulkLoad =
getEntrySizeExcludeBulkLoad(entry);
batch.addEntry(entry);
+ replicationSourceManager.setPendingShipment(true);
updateBatchStats(batch, entry, entryStream.getPosition(),
entrySize);
boolean totalBufferTooLarge =
acquireBufferQuota(entrySizeExlucdeBulkLoad);
// Stop if too many entries or too big
@@ -156,6 +160,11 @@ public class ReplicationSourceWALReaderThread extends
Thread {
break;
}
}
+ } else {
+
replicationSourceManager.logPositionAndCleanOldLogs(entryStream.getCurrentPath(),
+ this.replicationQueueInfo.getPeerClusterZnode(),
+ entryStream.getPosition(),
+ this.replicationQueueInfo.isQueueRecovered(), false);
}
}
if (batch != null && (!batch.getLastSeqIds().isEmpty() ||
batch.getNbEntries() > 0)) {
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
index a409fae..9f077da 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
@@ -25,6 +25,11 @@ import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.IOException;
@@ -72,6 +77,7 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.mockito.runners.MockitoJUnitRunner;
@@ -372,6 +378,40 @@ public class TestWALEntryStream {
}
@Test
+ public void testReplicationSourceUpdatesLogPositionOnFilteredEntries()
throws Exception {
+ appendEntriesToLog(3);
+ // get ending position
+ long position;
+ try (WALEntryStream entryStream =
+ new WALEntryStream(walQueue, fs, conf, new MetricsSource("1"))) {
+ entryStream.next();
+ entryStream.next();
+ entryStream.next();
+ position = entryStream.getPosition();
+ }
+ // start up a readerThread with a WALEntryFilter that always filter the
entries
+ ReplicationSourceManager mockSourceManager =
Mockito.mock(ReplicationSourceManager.class);
+ when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
+ ReplicationSourceWALReaderThread readerThread = new
ReplicationSourceWALReaderThread(
+ mockSourceManager, getQueueInfo(), walQueue, 0, fs, conf, new
WALEntryFilter() {
+ @Override
+ public Entry filter(Entry entry) {
+ return null;
+ }
+ }, new MetricsSource("1"));
+ readerThread.start();
+ Thread.sleep(100);
+ ArgumentCaptor<Long> positionCaptor = ArgumentCaptor.forClass(Long.class);
+ verify(mockSourceManager, times(3))
+ .logPositionAndCleanOldLogs(any(Path.class),
+ anyString(),
+ positionCaptor.capture(),
+ anyBoolean(),
+ anyBoolean());
+ assertEquals(position, positionCaptor.getValue().longValue());
+ }
+
+ @Test
public void testWALKeySerialization() throws Exception {
Map<String, byte[]> attributes = new HashMap<String, byte[]>();
attributes.put("foo", Bytes.toBytes("foo-value"));