This is an automated email from the ASF dual-hosted git repository.

bbeaudreault pushed a commit to branch hubspot-2
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 078e2619ea01889e19b4419ac3eab66c4ac1d3ae
Author: Bo Cui <cuibo0...@163.com>
AuthorDate: Thu Jan 14 11:36:07 2021 +0800

    HubSpot Backport: HBASE-23340 hmaster /hbase/replication/rs session expired 
(hbase replication default value is true, we don't use ) causes logcleaner can 
not clean oldWALs, which resulits in oldWALs too large (more than 2TB) (#2779)
    
        Signed-off-by: Duo Zhang <zhang...@apache.org>
        Signed-off-by: Pankaj Kumar<pankajku...@apache.org>
---
 .../org/apache/hadoop/hbase/master/HMaster.java    |  6 ++---
 .../hadoop/hbase/master/cleaner/LogCleaner.java    |  5 +++--
 .../replication/master/ReplicationLogCleaner.java  | 26 ++++++++++++++++------
 .../hbase/master/cleaner/TestLogsCleaner.java      |  4 ++--
 .../master/region/TestMasterRegionWALCleaner.java  |  2 +-
 5 files changed, 28 insertions(+), 15 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index ab765f74177..354c8feaa1e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -1417,17 +1417,17 @@ public class HMaster extends HRegionServer implements 
MasterServices {
 
     // Create cleaner thread pool
     cleanerPool = new DirScanPool(conf);
+    Map<String, Object> params = new HashMap<>();
+    params.put(MASTER, this);
     // Start log cleaner thread
     int cleanerInterval =
       conf.getInt(HBASE_MASTER_CLEANER_INTERVAL, 
DEFAULT_HBASE_MASTER_CLEANER_INTERVAL);
     this.logCleaner = new LogCleaner(cleanerInterval, this, conf,
-      getMasterWalManager().getFileSystem(), 
getMasterWalManager().getOldLogDir(), cleanerPool);
+      getMasterWalManager().getFileSystem(), 
getMasterWalManager().getOldLogDir(), cleanerPool, params);
     getChoreService().scheduleChore(logCleaner);
 
     // start the hfile archive cleaner thread
     Path archiveDir = HFileArchiveUtil.getArchivePath(conf);
-    Map<String, Object> params = new HashMap<>();
-    params.put(MASTER, this);
     this.hfileCleaner = new HFileCleaner(cleanerInterval, this, conf,
       getMasterFileSystem().getFileSystem(), archiveDir, cleanerPool, params);
     getChoreService().scheduleChore(hfileCleaner);
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java
index f65713ebf26..d8993b38ffe 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java
@@ -22,6 +22,7 @@ import static 
org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
@@ -72,9 +73,9 @@ public class LogCleaner extends 
CleanerChore<BaseLogCleanerDelegate>
    * @param pool the thread pool used to scan directories
    */
   public LogCleaner(final int period, final Stoppable stopper, Configuration 
conf, FileSystem fs,
-    Path oldLogDir, DirScanPool pool) {
+    Path oldLogDir, DirScanPool pool, Map<String, Object> params) {
     super("LogsCleaner", period, stopper, conf, fs, oldLogDir, 
HBASE_MASTER_LOGCLEANER_PLUGINS,
-      pool);
+      pool, params);
     this.pendingDelete = new LinkedBlockingQueue<>();
     int size = conf.getInt(OLD_WALS_CLEANER_THREAD_SIZE, 
DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE);
     this.oldWALsCleaner = createOldWalsCleaner(size);
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
index 148b33037cd..705302efcd2 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
@@ -19,16 +19,19 @@ package org.apache.hadoop.hbase.replication.master;
 
 import java.io.IOException;
 import java.util.Collections;
+import java.util.Map;
 import java.util.Set;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate;
 import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
 import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
+import org.apache.hbase.thirdparty.org.apache.commons.collections4.MapUtils;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -43,7 +46,8 @@ import 
org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
 public class ReplicationLogCleaner extends BaseLogCleanerDelegate {
   private static final Logger LOG = 
LoggerFactory.getLogger(ReplicationLogCleaner.class);
-  private ZKWatcher zkw;
+  private ZKWatcher zkw = null;
+  private boolean shareZK = false;
   private ReplicationQueueStorage queueStorage;
   private boolean stopped = false;
   private Set<String> wals;
@@ -92,12 +96,20 @@ public class ReplicationLogCleaner extends 
BaseLogCleanerDelegate {
   }
 
   @Override
-  public void setConf(Configuration config) {
-    // Make my own Configuration.  Then I'll have my own connection to zk that
-    // I can close myself when comes time.
-    Configuration conf = new Configuration(config);
+  public void init(Map<String, Object> params) {
+    super.init(params);
     try {
-      setConf(conf, new ZKWatcher(conf, "replicationLogCleaner", null));
+      if (MapUtils.isNotEmpty(params)) {
+        Object master = params.get(HMaster.MASTER);
+        if (master != null && master instanceof HMaster) {
+          zkw = ((HMaster) master).getZooKeeper();
+          shareZK = true;
+        }
+      }
+      if (zkw == null) {
+        zkw = new ZKWatcher(getConf(), "replicationLogCleaner", null);
+      }
+      this.queueStorage = 
ReplicationStorageFactory.getReplicationQueueStorage(zkw, getConf());
     } catch (IOException e) {
       LOG.error("Error while configuring " + this.getClass().getName(), e);
     }
@@ -118,7 +130,7 @@ public class ReplicationLogCleaner extends 
BaseLogCleanerDelegate {
   public void stop(String why) {
     if (this.stopped) return;
     this.stopped = true;
-    if (this.zkw != null) {
+    if (!shareZK && this.zkw != null) {
       LOG.info("Stopping " + this.zkw);
       this.zkw.close();
     }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
index b3d78fce096..ac29fee0a1a 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
@@ -200,7 +200,7 @@ public class TestLogsCleaner {
     // 10 procedure WALs
     assertEquals(10, fs.listStatus(OLD_PROCEDURE_WALS_DIR).length);
 
-    LogCleaner cleaner = new LogCleaner(1000, server, conf, fs, OLD_WALS_DIR, 
POOL);
+    LogCleaner cleaner = new LogCleaner(1000, server, conf, fs, OLD_WALS_DIR, 
POOL, null);
     cleaner.chore();
 
     // In oldWALs we end up with the current WAL, a newer WAL, the 3 old WALs 
which
@@ -297,7 +297,7 @@ public class TestLogsCleaner {
     Server server = new DummyServer();
 
     FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem();
-    LogCleaner cleaner = new LogCleaner(3000, server, conf, fs, OLD_WALS_DIR, 
POOL);
+    LogCleaner cleaner = new LogCleaner(3000, server, conf, fs, OLD_WALS_DIR, 
POOL, null);
     int size = cleaner.getSizeOfCleaners();
     assertEquals(LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC,
         cleaner.getCleanerThreadTimeoutMsec());
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/region/TestMasterRegionWALCleaner.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/region/TestMasterRegionWALCleaner.java
index 18afd3c9eb0..5da77379c43 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/region/TestMasterRegionWALCleaner.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/region/TestMasterRegionWALCleaner.java
@@ -72,7 +72,7 @@ public class TestMasterRegionWALCleaner extends 
MasterRegionTestBase {
       public boolean isStopped() {
         return stopped;
       }
-    }, conf, fs, globalWALArchiveDir, cleanerPool);
+    }, conf, fs, globalWALArchiveDir, cleanerPool, null);
     choreService.scheduleChore(logCleaner);
   }
 

Reply via email to