This is an automated email from the ASF dual-hosted git repository.
reidchan pushed a commit to branch branch-1
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-1 by this push:
new e2b5c20 HBASE-25612 [branch-1] HMaster should abort if
ReplicationLogCleaner is not able to delete oldWALs. (#3222)
e2b5c20 is described below
commit e2b5c20dc9c5395d3eb5a451ce07aa8faa6546e3
Author: Rushabh Shah <[email protected]>
AuthorDate: Thu May 6 11:29:19 2021 -0400
HBASE-25612 [branch-1] HMaster should abort if ReplicationLogCleaner is not
able to delete oldWALs. (#3222)
Signed-off-by Reid Chan <[email protected]>
---
.../org/apache/hadoop/hbase/master/HMaster.java | 6 +-
.../hadoop/hbase/master/cleaner/LogCleaner.java | 5 +-
.../replication/master/ReplicationLogCleaner.java | 88 +++++++++---------
.../hbase/master/cleaner/TestLogsCleaner.java | 103 +++++++++++++++++++--
4 files changed, 147 insertions(+), 55 deletions(-)
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 1342ce9..edee462 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -1286,16 +1286,16 @@ public class HMaster extends HRegionServer implements
MasterServices, Server {
// Create cleaner thread pool
cleanerPool = new DirScanPool(conf);
+ Map<String, Object> params = new HashMap<String, Object>();
+ params.put(MASTER, this);
// Start log cleaner thread
int cleanerInterval = conf.getInt("hbase.master.cleaner.interval", 600 *
1000);
this.logCleaner = new LogCleaner(cleanerInterval, this, conf,
getMasterFileSystem().getOldLogDir().getFileSystem(conf),
- getMasterFileSystem().getOldLogDir(), cleanerPool);
+ getMasterFileSystem().getOldLogDir(), cleanerPool, params);
getChoreService().scheduleChore(logCleaner);
//start the hfile archive cleaner thread
Path archiveDir = HFileArchiveUtil.getArchivePath(conf);
- Map<String, Object> params = new HashMap<String, Object>();
- params.put(MASTER, this);
this.hfileCleaner = new HFileCleaner(cleanerInterval, this, conf,
getMasterFileSystem().getFileSystem(), archiveDir, cleanerPool, params);
getChoreService().scheduleChore(hfileCleaner);
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java
index 52ce68f..4280421 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
@@ -71,8 +72,8 @@ public class LogCleaner extends
CleanerChore<BaseLogCleanerDelegate>
* @param oldLogDir the path to the archived logs
*/
public LogCleaner(final int p, final Stoppable s, Configuration conf,
FileSystem fs,
- Path oldLogDir, DirScanPool pool) {
- super("LogsCleaner", p, s, conf, fs, oldLogDir,
HBASE_MASTER_LOGCLEANER_PLUGINS, pool);
+ Path oldLogDir, DirScanPool pool, Map<String, Object> params) {
+ super("LogsCleaner", p, s, conf, fs, oldLogDir,
HBASE_MASTER_LOGCLEANER_PLUGINS, pool, params);
this.pendingDelete = new LinkedBlockingQueue<>();
int size = conf.getInt(OLD_WALS_CLEANER_THREAD_SIZE,
DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE);
this.oldWALsCleaner = createOldWalsCleaner(size);
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
index b27d8fa..7ac8489 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
@@ -18,14 +18,18 @@
*/
package org.apache.hadoop.hbase.replication.master;
+import com.google.common.base.Preconditions;
+import java.util.Map;
+import org.apache.commons.collections.MapUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
@@ -49,10 +53,11 @@ import org.apache.zookeeper.KeeperException;
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
public class ReplicationLogCleaner extends BaseLogCleanerDelegate {
private static final Log LOG =
LogFactory.getLog(ReplicationLogCleaner.class);
- private ZooKeeperWatcher zkw;
+ private ZooKeeperWatcher zkw = null;
private ReplicationQueuesClient replicationQueues;
private boolean stopped = false;
-
+ private MasterServices master;
+ private boolean shareZK = false;
@Override
public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) {
@@ -68,7 +73,7 @@ public class ReplicationLogCleaner extends
BaseLogCleanerDelegate {
// but they won't be deleted because they're not in the checking set.
wals = loadWALsFromQueues();
} catch (KeeperException e) {
- LOG.warn("Failed to read zookeeper, skipping checking deletable files");
+ LOG.warn("Failed to read zookeeper, skipping checking deletable files",
e);
return Collections.emptyList();
}
return Iterables.filter(files, new Predicate<FileStatus>() {
@@ -132,42 +137,57 @@ public class ReplicationLogCleaner extends
BaseLogCleanerDelegate {
LOG.warn("Not configured - allowing all wals to be deleted");
return;
}
- // Make my own Configuration. Then I'll have my own connection to zk that
- // I can close myself when comes time.
- Configuration conf = new Configuration(config);
- try {
- setConf(conf, new ZooKeeperWatcher(conf, "replicationLogCleaner", null));
- } catch (IOException e) {
- LOG.error("Error while configuring " + this.getClass().getName(), e);
- }
+ super.setConf(config);
}
- @InterfaceAudience.Private
- public void setConf(Configuration conf, ZooKeeperWatcher zk) {
- super.setConf(conf);
- try {
- this.zkw = zk;
- this.replicationQueues =
ReplicationFactory.getReplicationQueuesClient(zkw, conf,
- new WarnOnlyAbortable());
- this.replicationQueues.init();
- } catch (ReplicationException e) {
- LOG.error("Error while configuring " + this.getClass().getName(), e);
+ @Override
+ public void init(Map<String, Object> params) {
+ if (getConf() == null) {
+ // Replication is disabled so do nothing.
+ return;
+ }
+
+ if (MapUtils.isNotEmpty(params)) {
+ Object master = params.get(HMaster.MASTER);
+ if (master != null && master instanceof HMaster) {
+ this.master = (HMaster)master;
+ zkw = ((HMaster) master).getZooKeeper();
+ shareZK = true;
+ }
}
+ init(getConf(), this.zkw, null);
}
@InterfaceAudience.Private
- public void setConf(Configuration conf, ZooKeeperWatcher zk,
+ public void init(Configuration conf, ZooKeeperWatcher zk,
ReplicationQueuesClient replicationQueuesClient) {
super.setConf(conf);
- this.zkw = zk;
- this.replicationQueues = replicationQueuesClient;
+ try {
+ if (zk != null) {
+ this.zkw = zk;
+ } else {
+ this.zkw = new ZooKeeperWatcher(getConf(), "replicationLogCleaner",
null);
+ }
+ Preconditions.checkNotNull(this.zkw, "Zookeeper watcher cannot be null");
+ if (replicationQueuesClient != null) {
+ this.replicationQueues = replicationQueuesClient;
+ } else {
+ this.replicationQueues =
+ ReplicationFactory.getReplicationQueuesClient(zkw, getConf(),
master);
+ this.replicationQueues.init();
+ }
+ Preconditions.checkNotNull(this.replicationQueues,
+ "ReplicationQueues cannot be null");
+ } catch (IOException | ReplicationException e) {
+ LOG.error("Error while configuring " + this.getClass().getName(), e);
+ }
}
@Override
public void stop(String why) {
if (this.stopped) return;
this.stopped = true;
- if (this.zkw != null) {
+ if (!shareZK && this.zkw != null) {
LOG.info("Stopping " + this.zkw);
this.zkw.close();
}
@@ -177,20 +197,4 @@ public class ReplicationLogCleaner extends
BaseLogCleanerDelegate {
public boolean isStopped() {
return this.stopped;
}
-
- public static class WarnOnlyAbortable implements Abortable {
-
- @Override
- public void abort(String why, Throwable e) {
- LOG.warn("ReplicationLogCleaner received abort, ignoring. Reason: " +
why);
- if (LOG.isDebugEnabled()) {
- LOG.debug(e);
- }
- }
-
- @Override
- public boolean isAborted() {
- return false;
- }
- }
}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
index b295484..39cbc96 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
@@ -19,17 +19,25 @@ package org.apache.hadoop.hbase.master.cleaner;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import java.io.IOException;
import java.lang.reflect.Field;
import java.net.URLEncoder;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -44,11 +52,11 @@ import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Waiter;
-import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
@@ -59,10 +67,13 @@ import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.Stat;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -151,7 +162,7 @@ public class TestLogsCleaner {
assertEquals(34, fs.listStatus(oldLogDir).length);
- LogCleaner cleaner = new LogCleaner(1000, server, conf, fs, oldLogDir,
POOL);
+ LogCleaner cleaner = new LogCleaner(1000, server, conf, fs, oldLogDir,
POOL, null);
cleaner.chore();
@@ -175,7 +186,7 @@ public class TestLogsCleaner {
ReplicationLogCleaner cleaner = new ReplicationLogCleaner();
cleaner.setConf(conf);
- ReplicationQueuesClient rqcMock =
Mockito.mock(ReplicationQueuesClient.class);
+ ReplicationQueuesClient rqcMock = mock(ReplicationQueuesClient.class);
Mockito.when(rqcMock.getQueuesZNodeCversion()).thenReturn(1, 2, 3, 4);
Field rqc =
ReplicationLogCleaner.class.getDeclaredField("replicationQueues");
@@ -201,11 +212,12 @@ public class TestLogsCleaner {
FaultyZooKeeperWatcher faultyZK =
new FaultyZooKeeperWatcher(conf, "testZooKeeperAbort-faulty", null);
final AtomicBoolean getListOfReplicatorsFailed = new AtomicBoolean(false);
+ TestAbortable abortable = new TestAbortable();
try {
faultyZK.init();
ReplicationQueuesClient replicationQueuesClient =
spy(ReplicationFactory.getReplicationQueuesClient(
- faultyZK, conf, new ReplicationLogCleaner.WarnOnlyAbortable()));
+ faultyZK, conf, abortable));
doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
@@ -219,11 +231,12 @@ public class TestLogsCleaner {
}).when(replicationQueuesClient).getListOfReplicators();
replicationQueuesClient.init();
- cleaner.setConf(conf, faultyZK, replicationQueuesClient);
+ cleaner.init(conf, faultyZK, replicationQueuesClient);
// should keep all files due to a ConnectionLossException getting the
queues znodes
Iterable<FileStatus> toDelete = cleaner.getDeletableFiles(dummyFiles);
assertTrue(getListOfReplicatorsFailed.get());
+ assertTrue(abortable.isAborted());
assertFalse(toDelete.iterator().hasNext());
assertFalse(cleaner.isStopped());
} finally {
@@ -247,7 +260,7 @@ public class TestLogsCleaner {
ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf,
"testZooKeeperAbort-normal", null);
try {
- cleaner.setConf(conf, zkw);
+ cleaner.init(conf, zkw, null);
Iterable<FileStatus> filesToDelete =
cleaner.getDeletableFiles(dummyFiles);
Iterator<FileStatus> iter = filesToDelete.iterator();
assertTrue(iter.hasNext());
@@ -274,7 +287,7 @@ public class TestLogsCleaner {
Path oldWALsDir = new Path(TEST_UTIL.getDefaultRootDirPath(),
HConstants.HREGION_OLDLOGDIR_NAME);
FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem();
- final LogCleaner cleaner = new LogCleaner(3000, server, conf, fs,
oldWALsDir, POOL);
+ final LogCleaner cleaner = new LogCleaner(3000, server, conf, fs,
oldWALsDir, POOL, null);
assertEquals(LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE,
cleaner.getSizeOfCleaners());
assertEquals(LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC,
cleaner.getCleanerThreadTimeoutMsec());
@@ -391,7 +404,7 @@ public class TestLogsCleaner {
private RecoverableZooKeeper zk;
public FaultyZooKeeperWatcher(Configuration conf, String identifier,
Abortable abortable)
- throws ZooKeeperConnectionException, IOException {
+ throws IOException {
super(conf, identifier, abortable);
}
@@ -405,4 +418,78 @@ public class TestLogsCleaner {
return zk;
}
}
+
+ /**
+ * An {@link Abortable} implementation for tests.
+ */
+ class TestAbortable implements Abortable {
+ private volatile boolean aborted = false;
+
+ @Override
+ public void abort(String why, Throwable e) {
+ this.aborted = true;
+ }
+
+ @Override
+ public boolean isAborted() {
+ return this.aborted;
+ }
+ }
+
+ /**
+ * Throw SessionExpiredException when zk#getData is called.
+ */
+ static class SessionExpiredZooKeeperWatcher extends ZooKeeperWatcher {
+ private RecoverableZooKeeper zk;
+
+ public SessionExpiredZooKeeperWatcher(Configuration conf, String
identifier,
+ Abortable abortable) throws
IOException {
+ super(conf, identifier, abortable);
+ }
+
+ public void init() throws Exception {
+ this.zk = spy(super.getRecoverableZooKeeper());
+ doThrow(new KeeperException.SessionExpiredException())
+ .when(zk).getData(Mockito.anyString(), Mockito.any(Watcher.class),
Mockito.any(Stat.class));
+ }
+
+ @Override
+ public RecoverableZooKeeper getRecoverableZooKeeper() {
+ return zk;
+ }
+ }
+
+ /**
+ * Tests that HMaster#abort will be called if ReplicationLogCleaner
+ * encounters SessionExpiredException which is unrecoverable.
+ * @throws Exception Exception
+ */
+ @Test
+ public void testZookeeperSessionExpired() throws Exception {
+ Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
+ try(SessionExpiredZooKeeperWatcher sessionExpiredZK =
+ new SessionExpiredZooKeeperWatcher(conf,
"testSessionExpiredZk-faulty", null)) {
+ sessionExpiredZK.init();
+ ReplicationLogCleaner cleaner = new ReplicationLogCleaner();
+ cleaner.setConf(conf);
+ // Mock HMaster
+ HMaster master = mock(HMaster.class);
+ // Return SessionExpired Zookeeper.
+ doReturn(sessionExpiredZK).when(master).getZooKeeper();
+ doNothing().when(master).abort(Mockito.anyString(),
Mockito.any(Throwable.class));
+ Map<String, Object> params = new HashMap<>();
+ params.put(HMaster.MASTER, master);
+ cleaner.init(params);
+ // This will throw SessionExpiredException
+ cleaner.getDeletableFiles(new LinkedList<FileStatus>());
+ // make sure that HMaster#abort was called.
+ ArgumentCaptor<Throwable> throwableCaptor =
ArgumentCaptor.forClass(Throwable.class);
+
+ verify(master, times(1))
+ .abort(Mockito.anyString(), throwableCaptor.capture());
+ assertNotNull(throwableCaptor.getValue());
+ assertTrue("Should be SessionExpiredException",
+ throwableCaptor.getValue() instanceof
KeeperException.SessionExpiredException);
+ }
+ }
}