This is an automated email from the ASF dual-hosted git repository.

reidchan pushed a commit to branch branch-1
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-1 by this push:
     new e2b5c20  HBASE-25612 [branch-1] HMaster should abort if 
ReplicationLogCleaner is not able to delete oldWALs. (#3222)
e2b5c20 is described below

commit e2b5c20dc9c5395d3eb5a451ce07aa8faa6546e3
Author: Rushabh Shah <[email protected]>
AuthorDate: Thu May 6 11:29:19 2021 -0400

    HBASE-25612 [branch-1] HMaster should abort if ReplicationLogCleaner is not 
able to delete oldWALs. (#3222)
    
    Signed-off-by Reid Chan <[email protected]>
---
 .../org/apache/hadoop/hbase/master/HMaster.java    |   6 +-
 .../hadoop/hbase/master/cleaner/LogCleaner.java    |   5 +-
 .../replication/master/ReplicationLogCleaner.java  |  88 +++++++++---------
 .../hbase/master/cleaner/TestLogsCleaner.java      | 103 +++++++++++++++++++--
 4 files changed, 147 insertions(+), 55 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 1342ce9..edee462 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -1286,16 +1286,16 @@ public class HMaster extends HRegionServer implements 
MasterServices, Server {
 
     // Create cleaner thread pool
     cleanerPool = new DirScanPool(conf);
+    Map<String, Object> params = new HashMap<String, Object>();
+    params.put(MASTER, this);
     // Start log cleaner thread
     int cleanerInterval = conf.getInt("hbase.master.cleaner.interval", 600 * 
1000);
     this.logCleaner = new LogCleaner(cleanerInterval, this, conf,
       getMasterFileSystem().getOldLogDir().getFileSystem(conf),
-      getMasterFileSystem().getOldLogDir(), cleanerPool);
+      getMasterFileSystem().getOldLogDir(), cleanerPool, params);
     getChoreService().scheduleChore(logCleaner);
    //start the hfile archive cleaner thread
     Path archiveDir = HFileArchiveUtil.getArchivePath(conf);
-    Map<String, Object> params = new HashMap<String, Object>();
-    params.put(MASTER, this);
     this.hfileCleaner = new HFileCleaner(cleanerInterval, this, conf,
       getMasterFileSystem().getFileSystem(), archiveDir, cleanerPool, params);
     getChoreService().scheduleChore(hfileCleaner);
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java
index 52ce68f..4280421 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
@@ -71,8 +72,8 @@ public class LogCleaner extends 
CleanerChore<BaseLogCleanerDelegate>
    * @param oldLogDir the path to the archived logs
    */
   public LogCleaner(final int p, final Stoppable s, Configuration conf, 
FileSystem fs,
-      Path oldLogDir, DirScanPool pool) {
-    super("LogsCleaner", p, s, conf, fs, oldLogDir, 
HBASE_MASTER_LOGCLEANER_PLUGINS, pool);
+      Path oldLogDir, DirScanPool pool, Map<String, Object> params) {
+    super("LogsCleaner", p, s, conf, fs, oldLogDir, 
HBASE_MASTER_LOGCLEANER_PLUGINS, pool, params);
     this.pendingDelete = new LinkedBlockingQueue<>();
     int size = conf.getInt(OLD_WALS_CLEANER_THREAD_SIZE, 
DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE);
     this.oldWALsCleaner = createOldWalsCleaner(size);
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
index b27d8fa..7ac8489 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
@@ -18,14 +18,18 @@
  */
 package org.apache.hadoop.hbase.replication.master;
 
+import com.google.common.base.Preconditions;
+import java.util.Map;
+import org.apache.commons.collections.MapUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.master.MasterServices;
 import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate;
 import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationFactory;
@@ -49,10 +53,11 @@ import org.apache.zookeeper.KeeperException;
 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
 public class ReplicationLogCleaner extends BaseLogCleanerDelegate {
   private static final Log LOG = 
LogFactory.getLog(ReplicationLogCleaner.class);
-  private ZooKeeperWatcher zkw;
+  private ZooKeeperWatcher zkw = null;
   private ReplicationQueuesClient replicationQueues;
   private boolean stopped = false;
-
+  private MasterServices master;
+  private boolean shareZK = false;
 
   @Override
   public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) {
@@ -68,7 +73,7 @@ public class ReplicationLogCleaner extends 
BaseLogCleanerDelegate {
       // but they won't be deleted because they're not in the checking set.
       wals = loadWALsFromQueues();
     } catch (KeeperException e) {
-      LOG.warn("Failed to read zookeeper, skipping checking deletable files");
+      LOG.warn("Failed to read zookeeper, skipping checking deletable files", 
e);
       return Collections.emptyList();
     }
     return Iterables.filter(files, new Predicate<FileStatus>() {
@@ -132,42 +137,57 @@ public class ReplicationLogCleaner extends 
BaseLogCleanerDelegate {
       LOG.warn("Not configured - allowing all wals to be deleted");
       return;
     }
-    // Make my own Configuration.  Then I'll have my own connection to zk that
-    // I can close myself when comes time.
-    Configuration conf = new Configuration(config);
-    try {
-      setConf(conf, new ZooKeeperWatcher(conf, "replicationLogCleaner", null));
-    } catch (IOException e) {
-      LOG.error("Error while configuring " + this.getClass().getName(), e);
-    }
+    super.setConf(config);
   }
 
-  @InterfaceAudience.Private
-  public void setConf(Configuration conf, ZooKeeperWatcher zk) {
-    super.setConf(conf);
-    try {
-      this.zkw = zk;
-      this.replicationQueues = 
ReplicationFactory.getReplicationQueuesClient(zkw, conf,
-          new WarnOnlyAbortable());
-      this.replicationQueues.init();
-    } catch (ReplicationException e) {
-      LOG.error("Error while configuring " + this.getClass().getName(), e);
+  @Override
+  public void init(Map<String, Object> params) {
+    if (getConf() == null) {
+      // Replication is disabled so do nothing.
+      return;
+    }
+
+    if (MapUtils.isNotEmpty(params)) {
+      Object master = params.get(HMaster.MASTER);
+      if (master != null && master instanceof HMaster) {
+        this.master = (HMaster)master;
+        zkw = ((HMaster) master).getZooKeeper();
+        shareZK = true;
+      }
     }
+    init(getConf(), this.zkw, null);
   }
 
   @InterfaceAudience.Private
-  public void setConf(Configuration conf, ZooKeeperWatcher zk, 
+  public void init(Configuration conf, ZooKeeperWatcher zk,
       ReplicationQueuesClient replicationQueuesClient) {
     super.setConf(conf);
-    this.zkw = zk;
-    this.replicationQueues = replicationQueuesClient;
+    try {
+      if (zk != null) {
+        this.zkw = zk;
+      } else {
+        this.zkw = new ZooKeeperWatcher(getConf(), "replicationLogCleaner", 
null);
+      }
+      Preconditions.checkNotNull(this.zkw, "Zookeeper watcher cannot be null");
+      if (replicationQueuesClient != null) {
+        this.replicationQueues = replicationQueuesClient;
+      } else {
+        this.replicationQueues =
+          ReplicationFactory.getReplicationQueuesClient(zkw, getConf(), 
master);
+        this.replicationQueues.init();
+      }
+      Preconditions.checkNotNull(this.replicationQueues,
+        "ReplicationQueues cannot be null");
+    } catch (IOException | ReplicationException e) {
+      LOG.error("Error while configuring " + this.getClass().getName(), e);
+    }
   }
 
   @Override
   public void stop(String why) {
     if (this.stopped) return;
     this.stopped = true;
-    if (this.zkw != null) {
+    if (!shareZK && this.zkw != null) {
       LOG.info("Stopping " + this.zkw);
       this.zkw.close();
     }
@@ -177,20 +197,4 @@ public class ReplicationLogCleaner extends 
BaseLogCleanerDelegate {
   public boolean isStopped() {
     return this.stopped;
   }
-
-  public static class WarnOnlyAbortable implements Abortable {
-
-    @Override
-    public void abort(String why, Throwable e) {
-      LOG.warn("ReplicationLogCleaner received abort, ignoring.  Reason: " + 
why);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(e);
-      }
-    }
-
-    @Override
-    public boolean isAborted() {
-      return false;
-    }
-  }
 }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
index b295484..39cbc96 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
@@ -19,17 +19,25 @@ package org.apache.hadoop.hbase.master.cleaner;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 
 import java.io.IOException;
 import java.lang.reflect.Field;
 import java.net.URLEncoder;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -44,11 +52,11 @@ import org.apache.hadoop.hbase.ChoreService;
 import org.apache.hadoop.hbase.CoordinatedStateManager;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.Waiter;
-import org.apache.hadoop.hbase.ZooKeeperConnectionException;
 import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.replication.ReplicationFactory;
 import org.apache.hadoop.hbase.replication.ReplicationQueues;
@@ -59,10 +67,13 @@ import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
 import org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.Stat;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -151,7 +162,7 @@ public class TestLogsCleaner {
 
     assertEquals(34, fs.listStatus(oldLogDir).length);
 
-    LogCleaner cleaner  = new LogCleaner(1000, server, conf, fs, oldLogDir, 
POOL);
+    LogCleaner cleaner  = new LogCleaner(1000, server, conf, fs, oldLogDir, 
POOL, null);
 
     cleaner.chore();
 
@@ -175,7 +186,7 @@ public class TestLogsCleaner {
     ReplicationLogCleaner cleaner = new ReplicationLogCleaner();
     cleaner.setConf(conf);
 
-    ReplicationQueuesClient rqcMock = 
Mockito.mock(ReplicationQueuesClient.class);
+    ReplicationQueuesClient rqcMock = mock(ReplicationQueuesClient.class);
     Mockito.when(rqcMock.getQueuesZNodeCversion()).thenReturn(1, 2, 3, 4);
 
     Field rqc = 
ReplicationLogCleaner.class.getDeclaredField("replicationQueues");
@@ -201,11 +212,12 @@ public class TestLogsCleaner {
     FaultyZooKeeperWatcher faultyZK =
         new FaultyZooKeeperWatcher(conf, "testZooKeeperAbort-faulty", null);
     final AtomicBoolean getListOfReplicatorsFailed = new AtomicBoolean(false);
+    TestAbortable abortable = new TestAbortable();
 
     try {
       faultyZK.init();
       ReplicationQueuesClient replicationQueuesClient = 
spy(ReplicationFactory.getReplicationQueuesClient(
-        faultyZK, conf, new ReplicationLogCleaner.WarnOnlyAbortable()));
+        faultyZK, conf, abortable));
       doAnswer(new Answer<Object>() {
         @Override
         public Object answer(InvocationOnMock invocation) throws Throwable {
@@ -219,11 +231,12 @@ public class TestLogsCleaner {
       }).when(replicationQueuesClient).getListOfReplicators();
       replicationQueuesClient.init();
 
-      cleaner.setConf(conf, faultyZK, replicationQueuesClient);
+      cleaner.init(conf, faultyZK, replicationQueuesClient);
       // should keep all files due to a ConnectionLossException getting the 
queues znodes
       Iterable<FileStatus> toDelete = cleaner.getDeletableFiles(dummyFiles);
 
       assertTrue(getListOfReplicatorsFailed.get());
+      assertTrue(abortable.isAborted());
       assertFalse(toDelete.iterator().hasNext());
       assertFalse(cleaner.isStopped());
     } finally {
@@ -247,7 +260,7 @@ public class TestLogsCleaner {
 
     ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, 
"testZooKeeperAbort-normal", null);
     try {
-      cleaner.setConf(conf, zkw);
+      cleaner.init(conf, zkw, null);
       Iterable<FileStatus> filesToDelete = 
cleaner.getDeletableFiles(dummyFiles);
       Iterator<FileStatus> iter = filesToDelete.iterator();
       assertTrue(iter.hasNext());
@@ -274,7 +287,7 @@ public class TestLogsCleaner {
     Path oldWALsDir = new Path(TEST_UTIL.getDefaultRootDirPath(),
         HConstants.HREGION_OLDLOGDIR_NAME);
     FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem();
-    final LogCleaner cleaner = new LogCleaner(3000, server, conf, fs, 
oldWALsDir, POOL);
+    final LogCleaner cleaner = new LogCleaner(3000, server, conf, fs, 
oldWALsDir, POOL, null);
     assertEquals(LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE, 
cleaner.getSizeOfCleaners());
     assertEquals(LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC,
         cleaner.getCleanerThreadTimeoutMsec());
@@ -391,7 +404,7 @@ public class TestLogsCleaner {
     private RecoverableZooKeeper zk;
 
     public FaultyZooKeeperWatcher(Configuration conf, String identifier, 
Abortable abortable)
-        throws ZooKeeperConnectionException, IOException {
+        throws IOException {
       super(conf, identifier, abortable);
     }
 
@@ -405,4 +418,78 @@ public class TestLogsCleaner {
       return zk;
     }
   }
+
+  /**
+   * An {@link Abortable} implementation for tests.
+   */
+  class TestAbortable implements Abortable {
+    private volatile boolean aborted = false;
+
+    @Override
+    public void abort(String why, Throwable e) {
+      this.aborted = true;
+    }
+
+    @Override
+    public boolean isAborted() {
+      return this.aborted;
+    }
+  }
+
+  /**
+   * Throw SessionExpiredException when zk#getData is called.
+   */
+  static class SessionExpiredZooKeeperWatcher extends ZooKeeperWatcher {
+    private RecoverableZooKeeper zk;
+
+    public SessionExpiredZooKeeperWatcher(Configuration conf, String 
identifier,
+                                          Abortable abortable) throws 
IOException {
+      super(conf, identifier, abortable);
+    }
+
+    public void init() throws Exception {
+      this.zk = spy(super.getRecoverableZooKeeper());
+      doThrow(new KeeperException.SessionExpiredException())
+        .when(zk).getData(Mockito.anyString(), Mockito.any(Watcher.class), 
Mockito.any(Stat.class));
+    }
+
+    @Override
+    public RecoverableZooKeeper getRecoverableZooKeeper() {
+      return zk;
+    }
+  }
+
+  /**
+   * Tests that HMaster#abort will be called if ReplicationLogCleaner
+   * encounters SessionExpiredException which is unrecoverable.
+   * @throws Exception Exception
+   */
+  @Test
+  public void testZookeeperSessionExpired() throws Exception {
+    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
+    try(SessionExpiredZooKeeperWatcher sessionExpiredZK =
+          new SessionExpiredZooKeeperWatcher(conf, 
"testSessionExpiredZk-faulty", null)) {
+      sessionExpiredZK.init();
+      ReplicationLogCleaner cleaner = new ReplicationLogCleaner();
+      cleaner.setConf(conf);
+      // Mock HMaster
+      HMaster master  = mock(HMaster.class);
+      // Return SessionExpired Zookeeper.
+      doReturn(sessionExpiredZK).when(master).getZooKeeper();
+      doNothing().when(master).abort(Mockito.anyString(), 
Mockito.any(Throwable.class));
+      Map<String, Object> params = new HashMap<>();
+      params.put(HMaster.MASTER, master);
+      cleaner.init(params);
+      // This will throw SessionExpiredException
+      cleaner.getDeletableFiles(new LinkedList<FileStatus>());
+      // make sure that HMaster#abort was called.
+      ArgumentCaptor<Throwable> throwableCaptor = 
ArgumentCaptor.forClass(Throwable.class);
+
+      verify(master, times(1))
+        .abort(Mockito.anyString(), throwableCaptor.capture());
+      assertNotNull(throwableCaptor.getValue());
+      assertTrue("Should be SessionExpiredException",
+        throwableCaptor.getValue() instanceof 
KeeperException.SessionExpiredException);
+    }
+  }
 }

Reply via email to