HBASE-18282 ReplicationLogCleaner can delete WALs not yet replicated in case of 
a KeeperException

Signed-off-by: Andrew Purtell <apurt...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ef847f84
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ef847f84
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ef847f84

Branch: refs/heads/branch-1.2
Commit: ef847f8417b0a300f242fe76769d46d7efb86570
Parents: 0f3bf54
Author: Ben Lau <ben...@oath.com>
Authored: Wed Feb 14 11:36:04 2018 -0800
Committer: Andrew Purtell <apurt...@apache.org>
Committed: Wed Feb 14 17:23:38 2018 -0800

----------------------------------------------------------------------
 .../hbase/replication/ReplicationQueues.java    |  3 +-
 .../ReplicationQueuesClientZKImpl.java          |  5 ++
 .../replication/ReplicationQueuesZKImpl.java    | 10 ++++
 .../replication/ReplicationStateZKBase.java     |  8 ++-
 .../cleaner/ReplicationZKLockCleanerChore.java  |  4 +-
 .../master/ReplicationLogCleaner.java           | 10 +++-
 .../hbase/master/cleaner/TestLogsCleaner.java   | 54 ++++++++++++++++----
 7 files changed, 79 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/ef847f84/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
index 3dbbc33..f1457e0 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
@@ -104,8 +104,9 @@ public interface ReplicationQueues {
    * Get a list of all region servers that have outstanding replication 
queues. These servers could
    * be alive, dead or from a previous run of the cluster.
    * @return a list of server names
+   * @throws ReplicationException
    */
-  List<String> getListOfReplicators();
+  List<String> getListOfReplicators() throws ReplicationException;
 
   /**
    * Checks if the provided znode is the same as this region server's

http://git-wip-us.apache.org/repos/asf/hbase/blob/ef847f84/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
index e1a6a49..93a932f 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
@@ -47,6 +47,11 @@ public class ReplicationQueuesClientZKImpl extends 
ReplicationStateZKBase implem
   }
 
   @Override
+  public List<String> getListOfReplicators() throws KeeperException {
+    return super.getListOfReplicatorsZK();
+  }
+
+  @Override
   public List<String> getLogsInQueue(String serverName, String queueId) throws 
KeeperException {
     String znode = ZKUtil.joinZNode(this.queuesZNode, serverName);
     znode = ZKUtil.joinZNode(znode, queueId);

http://git-wip-us.apache.org/repos/asf/hbase/blob/ef847f84/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
index 35e5087..3085394 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
@@ -88,6 +88,16 @@ public class ReplicationQueuesZKImpl extends 
ReplicationStateZKBase implements R
   }
 
   @Override
+  public List<String> getListOfReplicators() throws ReplicationException {
+    try {
+      return super.getListOfReplicatorsZK();
+    } catch (KeeperException e) {
+      LOG.warn("getListOfReplicators() from ZK failed", e);
+      throw new ReplicationException("getListOfReplicators() from ZK failed", 
e);
+    }
+  }
+
+  @Override
   public void removeQueue(String queueId) {
     try {
       ZKUtil.deleteNodeRecursively(this.zookeeper, 
ZKUtil.joinZNode(this.myQueuesZnode, queueId));

http://git-wip-us.apache.org/repos/asf/hbase/blob/ef847f84/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
index 4fbac0f..75c13d8 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
@@ -76,12 +76,18 @@ public abstract class ReplicationStateZKBase {
     this.queuesZNode = ZKUtil.joinZNode(replicationZNode, queuesZNodeName);
   }
 
-  public List<String> getListOfReplicators() {
+  /**
+   * Subclasses that use ZK explicitly can just call this directly while 
classes
+   * that are trying to hide internal details of storage can wrap the 
KeeperException
+   * into a ReplicationException or something else.
+   */
+  protected List<String> getListOfReplicatorsZK() throws KeeperException {
     List<String> result = null;
     try {
       result = ZKUtil.listChildrenNoWatch(this.zookeeper, this.queuesZNode);
     } catch (KeeperException e) {
       this.abortable.abort("Failed to get list of replicators", e);
+      throw e;
     }
     return result;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ef847f84/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKLockCleanerChore.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKLockCleanerChore.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKLockCleanerChore.java
index 3fa30bf..7c50719 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKLockCleanerChore.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKLockCleanerChore.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.ScheduledChore;
 import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationFactory;
 import org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl;
 import org.apache.hadoop.hbase.replication.ReplicationTracker;
@@ -108,7 +109,8 @@ public class ReplicationZKLockCleanerChore extends 
ScheduledChore {
       }
     } catch (KeeperException e) {
       LOG.warn("zk operation interrupted", e);
+    } catch (ReplicationException e2) {
+      LOG.warn("replication exception", e2);
     }
-
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/ef847f84/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
index 7731240..42d66a5 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
@@ -156,6 +156,14 @@ public class ReplicationLogCleaner extends 
BaseLogCleanerDelegate {
     }
   }
 
+  @VisibleForTesting
+  public void setConf(Configuration conf, ZooKeeperWatcher zk, 
+      ReplicationQueuesClient replicationQueuesClient) {
+    super.setConf(conf);
+    this.zkw = zk;
+    this.replicationQueues = replicationQueuesClient;
+  }
+
   @Override
   public void stop(String why) {
     if (this.stopped) return;
@@ -171,7 +179,7 @@ public class ReplicationLogCleaner extends 
BaseLogCleanerDelegate {
     return this.stopped;
   }
 
-  private static class WarnOnlyAbortable implements Abortable {
+  public static class WarnOnlyAbortable implements Abortable {
 
     @Override
     public void abort(String why, Throwable e) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/ef847f84/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
index 8efa754..df5916c 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.doAnswer;
 
 import java.io.IOException;
 import java.lang.reflect.Field;
@@ -29,6 +30,7 @@ import java.net.URLEncoder;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import com.google.common.collect.Lists;
 import org.apache.hadoop.conf.Configuration;
@@ -55,12 +57,13 @@ import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
 import org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.data.Stat;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 @Category(MediumTests.class)
 public class TestLogsCleaner {
@@ -177,13 +180,10 @@ public class TestLogsCleaner {
     cleaner.getDeletableFiles(new LinkedList<FileStatus>());
   }
 
-  /**
-   * ReplicationLogCleaner should be able to ride over ZooKeeper errors without
-   * aborting.
-   */
-  @Test
-  public void testZooKeeperAbort() throws Exception {
+  @Test(timeout=10000)
+  public void testZooKeeperAbortDuringGetListOfReplicators() throws Exception {
     Configuration conf = TEST_UTIL.getConfiguration();
+
     ReplicationLogCleaner cleaner = new ReplicationLogCleaner();
 
     List<FileStatus> dummyFiles = Lists.newArrayList(
@@ -193,19 +193,51 @@ public class TestLogsCleaner {
 
     FaultyZooKeeperWatcher faultyZK =
         new FaultyZooKeeperWatcher(conf, "testZooKeeperAbort-faulty", null);
+    final AtomicBoolean getListOfReplicatorsFailed = new AtomicBoolean(false);
+
     try {
       faultyZK.init();
-      cleaner.setConf(conf, faultyZK);
+      ReplicationQueuesClient replicationQueuesClient = 
spy(ReplicationFactory.getReplicationQueuesClient(
+        faultyZK, conf, new ReplicationLogCleaner.WarnOnlyAbortable()));
+      doAnswer(new Answer<Object>() {
+        @Override
+        public Object answer(InvocationOnMock invocation) throws Throwable {
+          try {
+            return invocation.callRealMethod();
+          } catch (KeeperException.ConnectionLossException e) {
+            getListOfReplicatorsFailed.set(true);
+            throw e;
+          }
+        }
+      }).when(replicationQueuesClient).getListOfReplicators();
+      replicationQueuesClient.init();
+
+      cleaner.setConf(conf, faultyZK, replicationQueuesClient);
       // should keep all files due to a ConnectionLossException getting the 
queues znodes
       Iterable<FileStatus> toDelete = cleaner.getDeletableFiles(dummyFiles);
+
+      assertTrue(getListOfReplicatorsFailed.get());
       assertFalse(toDelete.iterator().hasNext());
       assertFalse(cleaner.isStopped());
     } finally {
       faultyZK.close();
     }
+  }
+
+  /**
+   * When zk is working both files should be returned
+   * @throws Exception
+   */
+  @Test(timeout=10000)
+  public void testZooKeeperNormal() throws Exception {
+    Configuration conf = TEST_UTIL.getConfiguration();
+    ReplicationLogCleaner cleaner = new ReplicationLogCleaner();
 
-    // when zk is working both files should be returned
-    cleaner = new ReplicationLogCleaner();
+    List<FileStatus> dummyFiles = Lists.newArrayList(
+        new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new 
Path("log1")),
+        new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new 
Path("log2"))
+    );
+    
     ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, 
"testZooKeeperAbort-normal", null);
     try {
       cleaner.setConf(conf, zkw);
@@ -291,7 +323,7 @@ public class TestLogsCleaner {
     public void init() throws Exception {
       this.zk = spy(super.getRecoverableZooKeeper());
       doThrow(new KeeperException.ConnectionLossException())
-          .when(zk).getData("/hbase/replication/rs", null, new Stat());
+        .when(zk).getChildren("/hbase/replication/rs", null);
     }
 
     public RecoverableZooKeeper getRecoverableZooKeeper() {

Reply via email to