This is an automated email from the ASF dual-hosted git repository.

apurtell pushed a commit to branch branch-1
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-1 by this push:
     new 9821fd8  HBASE-22784 OldWALs not cleared in a replication slave 
cluster (cyclic replication bw 2 clusters)
9821fd8 is described below

commit 9821fd8ffb573696c07caa9a5ec3a656592dedee
Author: Wellington Chevreuil <[email protected]>
AuthorDate: Thu Aug 8 12:19:09 2019 +0100

    HBASE-22784 OldWALs not cleared in a replication slave cluster (cyclic 
replication bw 2 clusters)
    
    Signed-off-by: Andrew Purtell <[email protected]>
---
 .../regionserver/ReplicationSource.java            |  1 +
 .../regionserver/ReplicationSourceManager.java     | 30 ++++++++++------
 .../ReplicationSourceWALReaderThread.java          |  9 +++++
 .../regionserver/TestWALEntryStream.java           | 40 ++++++++++++++++++++++
 4 files changed, 70 insertions(+), 10 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index ad941e0..c7780bb 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -783,6 +783,7 @@ public class ReplicationSource extends Thread implements 
ReplicationSourceInterf
     }
 
     private void updateLogPosition(long lastReadPosition) {
+      manager.setPendingShipment(false);
       manager.logPositionAndCleanOldLogs(currentPath, peerClusterZnode, 
lastReadPosition,
         this.replicationQueueInfo.isQueueRecovered(), false);
       lastLoggedPosition = lastReadPosition;
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index dbe9e63..6b8b6e2 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -121,9 +121,10 @@ public class ReplicationSourceManager implements 
ReplicationListener {
   private final Random rand;
   private final boolean replicationForBulkLoadDataEnabled;
 
-
   private AtomicLong totalBufferUsed = new AtomicLong();
 
+  private boolean pendingShipment;
+
   /**
    * Creates a replication manager and sets the watch on all the other 
registered region servers
    * @param replicationQueues the interface for manipulating replication queues
@@ -189,14 +190,20 @@ public class ReplicationSourceManager implements 
ReplicationListener {
    * @param queueRecovered indicates if this queue comes from another region 
server
    * @param holdLogInZK if true then the log is retained in ZK
    */
-  public void logPositionAndCleanOldLogs(Path log, String id, long position,
+  public synchronized void logPositionAndCleanOldLogs(Path log, String id, 
long position,
       boolean queueRecovered, boolean holdLogInZK) {
-    String fileName = log.getName();
-    this.replicationQueues.setLogPosition(id, fileName, position);
-    if (holdLogInZK) {
-     return;
+    if (!this.pendingShipment) {
+      String fileName = log.getName();
+      this.replicationQueues.setLogPosition(id, fileName, position);
+      if (holdLogInZK) {
+        return;
+      }
+      cleanOldLogs(fileName, id, queueRecovered);
     }
-    cleanOldLogs(fileName, id, queueRecovered);
+  }
+
+  public synchronized void setPendingShipment(boolean pendingShipment) {
+    this.pendingShipment = pendingShipment;
   }
 
   /**
@@ -209,9 +216,12 @@ public class ReplicationSourceManager implements 
ReplicationListener {
   public void cleanOldLogs(String key, String id, boolean queueRecovered) {
     String logPrefix = DefaultWALProvider.getWALPrefixFromWALName(key);
     if (queueRecovered) {
-      SortedSet<String> wals = walsByIdRecoveredQueues.get(id).get(logPrefix);
-      if (wals != null && !wals.first().equals(key)) {
-        cleanOldLogs(wals, key, id);
+      Map<String, SortedSet<String>> walsForPeer = 
walsByIdRecoveredQueues.get(id);
+      if(walsForPeer != null) {
+        SortedSet<String> wals = walsForPeer.get(logPrefix);
+        if (wals != null && !wals.first().equals(key)) {
+          cleanOldLogs(wals, key, id);
+        }
       }
     } else {
       synchronized (this.walsById) {
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
index ec5e862..1d94a7a 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
@@ -81,6 +81,8 @@ public class ReplicationSourceWALReaderThread extends Thread {
   private AtomicLong totalBufferUsed;
   private long totalBufferQuota;
 
+  private ReplicationSourceManager replicationSourceManager;
+
   /**
    * Creates a reader worker for a given WAL queue. Reads WAL entries off a 
given queue, batches the
    * entries, and puts them on a batch queue.
@@ -109,6 +111,7 @@ public class ReplicationSourceWALReaderThread extends 
Thread {
     // memory used will be batchSizeCapacity * (nb.batches + 1)
     // the +1 is for the current thread reading before placing onto the queue
     int batchCount = conf.getInt("replication.source.nb.batches", 1);
+    this.replicationSourceManager = manager;
     this.totalBufferUsed = manager.getTotalBufferUsed();
     this.totalBufferQuota = 
conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY,
       HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
@@ -148,6 +151,7 @@ public class ReplicationSourceWALReaderThread extends 
Thread {
                 long entrySize = getEntrySizeIncludeBulkLoad(entry);
                 long entrySizeExlucdeBulkLoad = 
getEntrySizeExcludeBulkLoad(entry);
                 batch.addEntry(entry);
+                replicationSourceManager.setPendingShipment(true);
                 updateBatchStats(batch, entry, entryStream.getPosition(), 
entrySize);
                 boolean totalBufferTooLarge = 
acquireBufferQuota(entrySizeExlucdeBulkLoad);
                 // Stop if too many entries or too big
@@ -156,6 +160,11 @@ public class ReplicationSourceWALReaderThread extends 
Thread {
                   break;
                 }
               }
+            } else {
+              
replicationSourceManager.logPositionAndCleanOldLogs(entryStream.getCurrentPath(),
+                this.replicationQueueInfo.getPeerClusterZnode(),
+                entryStream.getPosition(),
+                this.replicationQueueInfo.isQueueRecovered(), false);
             }
           }
           if (batch != null && (!batch.getLastSeqIds().isEmpty() || 
batch.getNbEntries() > 0)) {
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
index a409fae..9f077da 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
@@ -25,6 +25,11 @@ import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.io.IOException;
@@ -72,6 +77,7 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
 import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
 import org.mockito.runners.MockitoJUnitRunner;
 
@@ -372,6 +378,40 @@ public class TestWALEntryStream {
   }
 
   @Test
+  public void testReplicationSourceUpdatesLogPositionOnFilteredEntries() 
throws Exception {
+    appendEntriesToLog(3);
+    // get ending position
+    long position;
+    try (WALEntryStream entryStream =
+      new WALEntryStream(walQueue, fs, conf, new MetricsSource("1"))) {
+      entryStream.next();
+      entryStream.next();
+      entryStream.next();
+      position = entryStream.getPosition();
+    }
+    // start up a readerThread with a WALEntryFilter that always filter the 
entries
+    ReplicationSourceManager mockSourceManager = 
Mockito.mock(ReplicationSourceManager.class);
+    when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
+    ReplicationSourceWALReaderThread readerThread = new 
ReplicationSourceWALReaderThread(
+      mockSourceManager, getQueueInfo(), walQueue, 0, fs, conf, new 
WALEntryFilter() {
+        @Override
+        public Entry filter(Entry entry) {
+          return null;
+        }
+      }, new MetricsSource("1"));
+    readerThread.start();
+    Thread.sleep(100);
+    ArgumentCaptor<Long> positionCaptor = ArgumentCaptor.forClass(Long.class);
+    verify(mockSourceManager, times(3))
+      .logPositionAndCleanOldLogs(any(Path.class),
+        anyString(),
+        positionCaptor.capture(),
+        anyBoolean(),
+        anyBoolean());
+    assertEquals(position, positionCaptor.getValue().longValue());
+  }
+
+  @Test
   public void testWALKeySerialization() throws Exception {
     Map<String, byte[]> attributes = new HashMap<String, byte[]>();
     attributes.put("foo", Bytes.toBytes("foo-value"));

Reply via email to