This is an automated email from the ASF dual-hosted git repository.

dengzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 2fdb83941c3 HIVE-29274: Flaky 
TestMetastoreLeaseLeader#testHouseKeepingThreads (#6142)
2fdb83941c3 is described below

commit 2fdb83941c3194f7fec8aa900339e77f0b7c12b5
Author: dengzh <[email protected]>
AuthorDate: Mon Oct 27 20:37:31 2025 +0800

    HIVE-29274: Flaky TestMetastoreLeaseLeader#testHouseKeepingThreads (#6142)
---
 .../MetastoreHousekeepingLeaderTestBase.java       | 218 ++++++++++++++++++++-
 .../metastore/TestMetastoreHousekeepingLeader.java |  28 +--
 ...TestMetastoreHousekeepingLeaderEmptyConfig.java |  29 +--
 .../TestMetastoreHousekeepingNonLeader.java        |  37 +---
 .../hive/metastore/TestMetastoreLeaseLeader.java   |  74 +++----
 .../metastore/TestMetastoreLeaseNonLeader.java     |  50 +++--
 .../hive/metastore/PersistenceManagerProvider.java |   8 +-
 .../hive/metastore/leader/LeaderElection.java      |  16 +-
 .../metastore/leader/LeaderElectionContext.java    |  51 ++---
 .../metastore/leader/LeaderElectionFactory.java    |  67 ++++++-
 .../hive/metastore/leader/LeaseLeaderElection.java | 132 ++++---------
 .../apache/hadoop/hive/metastore/txn/TxnStore.java |   3 +-
 .../hive/metastore/leader/TestLeaderElection.java  |  59 +-----
 13 files changed, 419 insertions(+), 353 deletions(-)

diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/MetastoreHousekeepingLeaderTestBase.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/MetastoreHousekeepingLeaderTestBase.java
index e8ec181a684..61c81c29af7 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/MetastoreHousekeepingLeaderTestBase.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/MetastoreHousekeepingLeaderTestBase.java
@@ -18,10 +18,16 @@
 
 package org.apache.hadoop.hive.metastore;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hive.common.TableName;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
+import org.apache.hadoop.hive.metastore.leader.LeaderElection;
+import org.apache.hadoop.hive.metastore.leader.LeaderElectionContext;
+import org.apache.hadoop.hive.metastore.leader.LeaderElectionFactory;
+import org.apache.hadoop.hive.metastore.leader.LeaseLeaderElection;
 import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge;
 import org.apache.hadoop.hive.ql.stats.StatsUpdaterThread;
 import org.apache.hadoop.hive.ql.txn.compactor.Cleaner;
@@ -31,18 +37,23 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 /**
  * Base class for HMS leader config testing.
  */
-class MetastoreHousekeepingLeaderTestBase {
+abstract class MetastoreHousekeepingLeaderTestBase {
   private static final Logger LOG = 
LoggerFactory.getLogger(MetastoreHousekeepingLeaderTestBase.class);
   private static HiveMetaStoreClient client;
-  protected static Configuration conf = MetastoreConf.newMetastoreConf();
+  protected Configuration conf;
   private static Warehouse warehouse;
   private static boolean isServerStarted = false;
   private static int port;
@@ -54,12 +65,15 @@ class MetastoreHousekeepingLeaderTestBase {
   static Map<String, Boolean> threadNames = new HashMap<>();
   static Map<Class<? extends Thread>, Boolean> threadClasses = new HashMap<>();
 
-  void internalSetup(final String leaderHostName, boolean configuredLeader) 
throws Exception {
+  void setup(final String leaderHostName, Configuration configuration) throws 
Exception {
+    this.conf = configuration;
     MetaStoreTestUtils.setConfForStandloneMode(conf);
     MetastoreConf.setVar(conf, ConfVars.THRIFT_BIND_HOST, "localhost");
-    MetastoreConf.setVar(conf, 
ConfVars.METASTORE_HOUSEKEEPING_LEADER_HOSTNAME, leaderHostName);
     MetastoreConf.setVar(conf, ConfVars.METASTORE_HOUSEKEEPING_LEADER_ELECTION,
-        configuredLeader ? "host" : "lock");
+        leaderHostName != null ? LeaderElectionFactory.Method.HOST.name() : 
LeaderElectionFactory.Method.LOCK.name());
+    if (leaderHostName != null) {
+      MetastoreConf.setVar(conf, 
ConfVars.METASTORE_HOUSEKEEPING_LEADER_HOSTNAME, leaderHostName);
+    }
     MetastoreConf.setBoolVar(conf, 
MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON, true);
     MetastoreConf.setBoolVar(conf, 
MetastoreConf.ConfVars.COMPACTOR_CLEANER_ON, true);
 
@@ -193,5 +207,199 @@ private void resetThreadStatus() {
     threadNames.forEach((name, status) -> threadNames.put(name, false));
     threadClasses.forEach((thread, status) -> threadClasses.put(thread, 
false));
   }
+
+  static class CombinedLeaderElector implements AutoCloseable {
+    List<Pair<TableName, LeaderElection<TableName>>> elections = new 
ArrayList<>();
+    private final Configuration configuration;
+    private String name;
+
+    CombinedLeaderElector(Configuration conf) throws IOException {
+      this.configuration = conf;
+      for (LeaderElectionContext.TTYPE type : 
LeaderElectionContext.TTYPE.values()) {
+        TableName table = type.getTableName();
+        elections.add(Pair.of(table, new LeaseLeaderElection()));
+      }
+    }
+
+    public void tryBeLeader() throws Exception {
+      int i = 0;
+      for (Pair<TableName, LeaderElection<TableName>> election : elections) {
+        LeaderElection<TableName> le = election.getRight();
+        le.setName(name + "-" + i++);
+        le.tryBeLeader(configuration, election.getLeft());
+      }
+    }
+
+    public boolean isLeader() {
+      boolean isLeader = true;
+      for (Pair<TableName, LeaderElection<TableName>> election : elections) {
+        isLeader &= election.getRight().isLeader();
+      }
+      return isLeader;
+    }
+
+    public void setName(String name) {
+      this.name = name;
+    }
+
+    @Override
+    public void close() throws Exception {
+      for (Pair<TableName, LeaderElection<TableName>> election : elections) {
+        election.getRight().close();
+      }
+    }
+  }
+
+  static class ReleaseAndRequireLease extends LeaseLeaderElection {
+    private static CountDownLatch latch;
+    private final Configuration configuration;
+    private final boolean needRenewLease;
+    private TableName tableName;
+
+    public static void setMonitor(CountDownLatch latch) {
+      ReleaseAndRequireLease.latch = latch;
+    }
+    public static void reset() {
+      ReleaseAndRequireLease.latch = null;
+    }
+
+    public ReleaseAndRequireLease(Configuration conf, boolean needRenewLease) 
throws IOException {
+      super();
+      this.configuration = conf;
+      this.needRenewLease = needRenewLease;
+    }
+
+    @Override
+    public void setName(String name) {
+      super.setName(name);
+      LeaderElectionContext.TTYPE type = null;
+      for (LeaderElectionContext.TTYPE value : 
LeaderElectionContext.TTYPE.values()) {
+        if (value.getName().equalsIgnoreCase(name)) {
+          type = value;
+          break;
+        }
+      }
+      if (type == null) {
+        // This shouldn't happen at all
+        throw new AssertionError("Unknown elector name: " + name);
+      }
+      this.tableName = type.getTableName();
+    }
+
+    @Override
+    protected void notifyListener() {
+      ScheduledExecutorService service = null;
+      if (!isLeader) {
+        try {
+          service = ThreadPool.getPool();
+        } catch (Exception ignored) {
+        }
+      }
+      super.notifyListener();
+      if (isLeader) {
+        if (!needRenewLease) {
+          super.shutdownWatcher();
+          // In our tests, the time spent on notifying the listener might be 
greater than the lease timeout,
+          // which makes the leader loss the leadership quickly after wake up, 
and kill all housekeeping services.
+          // Make sure the leader is still valid while notifying the listener, 
and switch to ReleaseAndRequireWatcher
+          // after all listeners finish their work.
+          heartbeater = new ReleaseAndRequireWatcher(configuration, tableName);
+          heartbeater.startWatch();
+        }
+      } else {
+        if (service != null) {
+          // If the housekeeping task is running behind
+          Assert.assertTrue(service.isShutdown());
+          // Interrupt all sleeping tasks
+          service.shutdownNow();
+          try {
+            // This is the last one get notified, sleep some time to make sure 
all other
+            // services have been stopped before return
+            Thread.sleep(12000);
+          } catch (InterruptedException ignore) {
+          }
+        }
+      }
+      if (latch != null) {
+        latch.countDown();
+      }
+    }
+
+    // For testing purpose only, lock would become timeout and then acquire it 
again
+    private class ReleaseAndRequireWatcher extends LeaseWatcher {
+      long timeout;
+      public ReleaseAndRequireWatcher(Configuration conf,
+          TableName tableName) {
+        super(conf, tableName);
+        timeout = MetastoreConf.getTimeVar(conf,
+            MetastoreConf.ConfVars.TXN_TIMEOUT, TimeUnit.MILLISECONDS) + 3000;
+        setName("ReleaseAndRequireWatcher-" + ((name != null) ? name + "-" : 
"") + ID.incrementAndGet());
+      }
+
+      @Override
+      public void beforeRun() {
+        try {
+          Thread.sleep(timeout);
+        } catch (InterruptedException e) {
+          // ignore this
+        }
+      }
+
+      @Override
+      public void runInternal() {
+        shutDown();
+        // The timeout lock should be cleaned,
+        // sleep some time to let others take the chance to become the leader
+        try {
+          Thread.sleep(5000);
+        } catch (InterruptedException e) {
+          // ignore
+        }
+        // Acquire the lock again
+        conf = new Configuration(conf);
+        reclaim();
+      }
+    }
+  }
+
+  public void checkHouseKeepingThreadExistence(boolean isLeader) throws 
Exception {
+    searchHousekeepingThreads();
+
+    // Verify existence of threads
+    for (Map.Entry<String, Boolean> entry : threadNames.entrySet()) {
+      if (entry.getValue()) {
+        LOG.info("Found thread with name {}", entry.getKey());
+      } else {
+        LOG.info("No thread found with name {}", entry.getKey());
+      }
+      if (isLeader) {
+        Assert.assertTrue("No thread with name " + entry.getKey() + " found.", 
entry.getValue());
+      } else {
+        Assert.assertFalse("Thread with name " + entry.getKey() + " found.", 
entry.getValue());
+      }
+    }
+
+    for (Map.Entry<Class<? extends Thread>, Boolean> entry : 
threadClasses.entrySet()) {
+      if (isLeader) {
+        if (entry.getValue()) {
+          LOG.info("Found thread for {}", entry.getKey().getSimpleName());
+        }
+        Assert.assertTrue("No thread found for class " + 
entry.getKey().getSimpleName(), entry.getValue());
+      } else {
+        // A non-leader HMS will still run the configured number of Compaction 
worker threads.
+        if (entry.getKey() == Worker.class) {
+          if (entry.getValue()) {
+            LOG.info("Thread found for " + entry.getKey().getSimpleName());
+          }
+        } else {
+          if (!entry.getValue()) {
+            LOG.info("No thread found for " + entry.getKey().getSimpleName());
+          }
+          Assert.assertFalse("Thread found for class " + 
entry.getKey().getSimpleName(),
+              entry.getValue());
+        }
+      }
+    }
+  }
 }
 
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetastoreHousekeepingLeader.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetastoreHousekeepingLeader.java
index 3df58ed2919..ae7cf125759 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetastoreHousekeepingLeader.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetastoreHousekeepingLeader.java
@@ -18,44 +18,24 @@
 
 package org.apache.hadoop.hive.metastore;
 
-import org.junit.Assert;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.junit.Before;
 import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
 
 /**
  * Test for specifying a valid hostname as HMS leader.
  */
 public class TestMetastoreHousekeepingLeader extends 
MetastoreHousekeepingLeaderTestBase {
-  private static final Logger LOG = 
LoggerFactory.getLogger(TestMetastoreHousekeepingLeader.class);
 
   @Before
   public void setUp() throws Exception {
-    internalSetup("localhost", true);
+    setup("localhost", MetastoreConf.newMetastoreConf());
   }
 
   @Test
   public void testHouseKeepingThreadExistence() throws Exception {
-    searchHousekeepingThreads();
-
-    // Verify existence of threads
-    for (Map.Entry<String, Boolean> entry : threadNames.entrySet()) {
-      if (entry.getValue()) {
-        LOG.info("Found thread with name " + entry.getKey());
-      }
-      Assert.assertTrue("No thread with name " + entry.getKey() + " found.", 
entry.getValue());
-    }
-
-    for (Map.Entry<Class<? extends Thread>, Boolean> entry : 
threadClasses.entrySet()) {
-      if (entry.getValue()) {
-        LOG.info("Found thread for " + entry.getKey().getSimpleName());
-      }
-      Assert.assertTrue("No thread found for class " + 
entry.getKey().getSimpleName(),
-              entry.getValue());
-    }
+    checkHouseKeepingThreadExistence(true);
   }
+
 }
 
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetastoreHousekeepingLeaderEmptyConfig.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetastoreHousekeepingLeaderEmptyConfig.java
index 7f05902cc8f..69f01a85c2c 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetastoreHousekeepingLeaderEmptyConfig.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetastoreHousekeepingLeaderEmptyConfig.java
@@ -18,45 +18,24 @@
 
 package org.apache.hadoop.hive.metastore;
 
-import org.junit.Assert;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.junit.Before;
 import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
 
 /**
  * Test for specifying empty HMS leader.
  */
 public class TestMetastoreHousekeepingLeaderEmptyConfig extends 
MetastoreHousekeepingLeaderTestBase {
-  private static final Logger LOG =
-          
LoggerFactory.getLogger(TestMetastoreHousekeepingLeaderEmptyConfig.class);
 
   @Before
   public void setUp() throws Exception {
     // Empty string for leader indicates that the HMS is leader.
-    internalSetup("", true);
+    setup("", MetastoreConf.newMetastoreConf());
   }
 
   @Test
   public void testHouseKeepingThreadExistence() throws Exception {
-    searchHousekeepingThreads();
-
-    // Verify existence of threads
-    for (Map.Entry<String, Boolean> entry : threadNames.entrySet()) {
-      if (entry.getValue()) {
-        LOG.info("Found thread with name " + entry.getKey());
-      }
-      Assert.assertTrue("No thread with name " + entry.getKey() + " found.", 
entry.getValue());
-    }
-
-    for (Map.Entry<Class<? extends Thread>, Boolean> entry : 
threadClasses.entrySet()) {
-      if (entry.getValue()) {
-        LOG.info("Found thread for " + entry.getKey().getSimpleName());
-      }
-      Assert.assertTrue("No thread found for class " + 
entry.getKey().getSimpleName(),
-              entry.getValue());
-    }
+    checkHouseKeepingThreadExistence(true);
   }
+
 }
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetastoreHousekeepingNonLeader.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetastoreHousekeepingNonLeader.java
index 4a0212d6860..1a113c9cc5c 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetastoreHousekeepingNonLeader.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetastoreHousekeepingNonLeader.java
@@ -18,53 +18,24 @@
 
 package org.apache.hadoop.hive.metastore;
 
-import org.apache.hadoop.hive.ql.txn.compactor.Worker;
-import org.junit.Assert;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.junit.Before;
 import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
 
 /**
  * Test for specifying HMS leader other than the current one.
  */
 public class TestMetastoreHousekeepingNonLeader extends 
MetastoreHousekeepingLeaderTestBase {
-  private static final Logger LOG =
-          
LoggerFactory.getLogger(TestMetastoreHousekeepingLeaderEmptyConfig.class);
 
   @Before
   public void setUp() throws Exception {
     // Empty string for leader indicates that the HMS is leader.
-    internalSetup("some_non_leader_host.domain1.domain", true);
+    setup("some_non_leader_host.domain1.domain", 
MetastoreConf.newMetastoreConf());
   }
 
   @Test
   public void testHouseKeepingThreadExistence() throws Exception {
-    searchHousekeepingThreads();
-
-    // Verify existence of threads
-    for (Map.Entry<String, Boolean> entry : threadNames.entrySet()) {
-      if (!entry.getValue()) {
-        LOG.info("No thread found with name " + entry.getKey());
-      }
-      Assert.assertFalse("Thread with name " + entry.getKey() + " found.", 
entry.getValue());
-    }
-
-    for (Map.Entry<Class<? extends Thread>, Boolean> entry : 
threadClasses.entrySet()) {
-      // A non-leader HMS will still run the configured number of Compaction 
worker threads.
-      if (entry.getKey() == Worker.class) {
-        if (entry.getValue()) {
-          LOG.info("Thread found for " + entry.getKey().getSimpleName());
-        }
-      } else {
-        if (!entry.getValue()) {
-          LOG.info("No thread found for " + entry.getKey().getSimpleName());
-        }
-        Assert.assertFalse("Thread found for class " + 
entry.getKey().getSimpleName(),
-                entry.getValue());
-      }
-    }
+    checkHouseKeepingThreadExistence(false);
   }
+
 }
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetastoreLeaseLeader.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetastoreLeaseLeader.java
index 88231b3ebba..8e570c9b389 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetastoreLeaseLeader.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetastoreLeaseLeader.java
@@ -19,65 +19,65 @@
 package org.apache.hadoop.hive.metastore;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.common.TableName;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
-import org.apache.hadoop.hive.metastore.leader.LeaderElection;
 import org.apache.hadoop.hive.metastore.leader.LeaderElectionContext;
-import org.apache.hadoop.hive.metastore.leader.LeaseLeaderElection;
+import org.apache.hadoop.hive.metastore.leader.LeaderElectionFactory;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
-public class TestMetastoreLeaseLeader {
+public class TestMetastoreLeaseLeader extends 
MetastoreHousekeepingLeaderTestBase {
 
-  LeaderElection election;
-
-  TestMetastoreHousekeepingLeader hms;
+  CombinedLeaderElector elector;
 
   @Before
   public void setUp() throws Exception {
-    hms = new TestMetastoreHousekeepingLeader();
-    MetastoreConf.setTimeVar(hms.conf, MetastoreConf.ConfVars.TXN_TIMEOUT, 3, 
TimeUnit.SECONDS);
-    MetastoreConf.setTimeVar(hms.conf, 
MetastoreConf.ConfVars.LOCK_SLEEP_BETWEEN_RETRIES, 1, TimeUnit.SECONDS);
-    hms.conf.setBoolean(LeaseLeaderElection.METASTORE_RENEW_LEASE, false);
-    hms.conf.setBoolean(LeaderElectionContext.LEADER_IN_TEST, true);
-    hms.conf.set("hive.txn.manager", 
"org.apache.hadoop.hive.ql.lockmgr.DbTxnManager");
-    hms.internalSetup("", false);
+    Configuration configuration = MetastoreConf.newMetastoreConf();
+    MetastoreConf.setTimeVar(configuration, 
MetastoreConf.ConfVars.TXN_TIMEOUT, 3, TimeUnit.SECONDS);
+    MetastoreConf.setTimeVar(configuration, 
MetastoreConf.ConfVars.LOCK_SLEEP_BETWEEN_RETRIES, 200, TimeUnit.MILLISECONDS);
+    MetastoreConf.setLongVar(configuration, 
MetastoreConf.ConfVars.HMS_HANDLER_ATTEMPTS, 3);
+    MetastoreConf.setTimeVar(configuration, 
MetastoreConf.ConfVars.HMS_HANDLER_INTERVAL, 100, TimeUnit.MILLISECONDS);
+    configuration.set("hive.txn.manager", 
"org.apache.hadoop.hive.ql.lockmgr.DbTxnManager");
+    
LeaderElectionFactory.addElectionCreator(LeaderElectionFactory.Method.LOCK, 
conf -> new ReleaseAndRequireLease(conf, false));
+    setup(null, configuration);
 
-    Configuration conf = MetastoreConf.newMetastoreConf();
-    MetastoreConf.setTimeVar(conf, 
MetastoreConf.ConfVars.LOCK_SLEEP_BETWEEN_RETRIES, 1, TimeUnit.SECONDS);
-    MetastoreConf.setTimeVar(conf, MetastoreConf.ConfVars.TXN_TIMEOUT, 3, 
TimeUnit.SECONDS);
-    MetastoreConf.setVar(conf, 
MetastoreConf.ConfVars.METASTORE_HOUSEKEEPING_LEADER_ELECTION, "lock");
-    election = new LeaseLeaderElection();
-    TableName tableName = (TableName) 
LeaderElectionContext.getLeaderMutex(conf,
-        LeaderElectionContext.TTYPE.HOUSEKEEPING, null);
-    election.tryBeLeader(conf, tableName);
+    Configuration conf = new Configuration(configuration);
+    elector = new CombinedLeaderElector(conf);
+    elector.setName("TestMetastoreLeaseLeader");
+    elector.tryBeLeader();
   }
 
   @Test
   public void testHouseKeepingThreads() throws Exception {
+    int size = LeaderElectionContext.TTYPE.values().length;
+    CountDownLatch latch = new CountDownLatch(size);
+    
MetastoreHousekeepingLeaderTestBase.ReleaseAndRequireLease.setMonitor(latch);
     // hms is the leader now
-    hms.testHouseKeepingThreadExistence();
-    assertFalse(election.isLeader());
-    Thread.sleep(15 * 1000);
-    // the lease of hms is timeout, election becomes leader now
-    assertTrue(election.isLeader());
-    try {
-      // hms should shutdown all housekeeping tasks
-      hms.testHouseKeepingThreadExistence();
-      throw new IllegalStateException("HMS should shutdown all housekeeping 
tasks");
-    } catch (AssertionError e) {
-      // expected
-    }
+    checkHouseKeepingThreadExistence(true);
+    assertFalse(elector.isLeader());
+    latch.await();
+    // the lease of hms is timeout, the elector becomes leader now
+    assertTrue(elector.isLeader());
+    // hms should shut down all housekeeping tasks
+    checkHouseKeepingThreadExistence(false);
 
-    election.close();
-    Thread.sleep(10000);
+    latch = new CountDownLatch(size);
+    
MetastoreHousekeepingLeaderTestBase.ReleaseAndRequireLease.setMonitor(latch);
+    elector.close();
+    latch.await();
     // hms becomes leader again
-    hms.testHouseKeepingThreadExistence();
+    checkHouseKeepingThreadExistence(true);
+  }
+
+  @After
+  public void afterTest() {
+    MetastoreHousekeepingLeaderTestBase.ReleaseAndRequireLease.reset();
   }
 
 }
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetastoreLeaseNonLeader.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetastoreLeaseNonLeader.java
index 6a340e4e661..0873c129454 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetastoreLeaseNonLeader.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetastoreLeaseNonLeader.java
@@ -19,56 +19,54 @@
 package org.apache.hadoop.hive.metastore;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.common.TableName;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
-import org.apache.hadoop.hive.metastore.leader.LeaderElection;
 import org.apache.hadoop.hive.metastore.leader.LeaderElectionContext;
-import org.apache.hadoop.hive.metastore.leader.LeaseLeaderElection;
+import org.apache.hadoop.hive.metastore.leader.LeaderElectionFactory;
 import org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertTrue;
 
-public class TestMetastoreLeaseNonLeader {
+public class TestMetastoreLeaseNonLeader extends 
MetastoreHousekeepingLeaderTestBase {
 
-  LeaderElection election;
-
-  TestMetastoreHousekeepingLeader hms;
+  CombinedLeaderElector elector;
 
   @Before
   public void setUp() throws Exception {
     Configuration conf = MetastoreConf.newMetastoreConf();
     TestTxnDbUtil.setConfValues(conf);
     TestTxnDbUtil.prepDb(conf);
-    election = new LeaseLeaderElection();
-    MetastoreConf.setVar(conf, 
MetastoreConf.ConfVars.METASTORE_HOUSEKEEPING_LEADER_ELECTION, "lock");
-    TableName tableName = (TableName) 
LeaderElectionContext.getLeaderMutex(conf,
-        LeaderElectionContext.TTYPE.HOUSEKEEPING, null);
-    election.tryBeLeader(conf, tableName);
-    assertTrue("The elector should hold the lease now", election.isLeader());
+    elector = new CombinedLeaderElector(conf);
+    elector.setName("TestMetastoreLeaseNonLeader");
+    elector.tryBeLeader();
+    assertTrue("The elector should hold the lease now", elector.isLeader());
     // start the non-leader hms now
-    hms = new TestMetastoreHousekeepingLeader();
-    MetastoreConf.setTimeVar(hms.conf, 
MetastoreConf.ConfVars.LOCK_SLEEP_BETWEEN_RETRIES, 1, TimeUnit.SECONDS);
-    hms.conf.setBoolean(LeaderElectionContext.LEADER_IN_TEST, true);
-    hms.internalSetup("", false);
+    Configuration configuration = new Configuration(conf);
+    MetastoreConf.setTimeVar(configuration, 
MetastoreConf.ConfVars.LOCK_SLEEP_BETWEEN_RETRIES, 100, TimeUnit.MILLISECONDS);
+    
LeaderElectionFactory.addElectionCreator(LeaderElectionFactory.Method.LOCK,  c 
-> new ReleaseAndRequireLease(c, true));
+    setup(null, configuration);
   }
 
   @Test
   public void testHouseKeepingThreads() throws Exception {
-    try {
-      hms.testHouseKeepingThreadExistence();
-      throw new IllegalStateException("HMS shouldn't start any housekeeping 
tasks");
-    } catch (AssertionError e) {
-      // expected
-    }
+    checkHouseKeepingThreadExistence(false);
     // elector releases the lease
-    election.close();
-    Thread.sleep(10 * 1000);
+    CountDownLatch latch = new 
CountDownLatch(LeaderElectionContext.TTYPE.values().length);
+    
MetastoreHousekeepingLeaderTestBase.ReleaseAndRequireLease.setMonitor(latch);
+    elector.close();
+    latch.await();
     // housing threads are here now as the hms wins the election
-    hms.testHouseKeepingThreadExistence();
+    checkHouseKeepingThreadExistence(true);
+  }
+
+  @After
+  public void afterTest() {
+    MetastoreHousekeepingLeaderTestBase.ReleaseAndRequireLease.reset();
   }
 
 }
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PersistenceManagerProvider.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PersistenceManagerProvider.java
index bf99745cc7d..0ea4f54b297 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PersistenceManagerProvider.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PersistenceManagerProvider.java
@@ -173,10 +173,10 @@ private static void logPropChanges(Properties newProps) {
       return;
     }
     LOG.info("Updating the pmf due to property change");
-    if (LOG.isDebugEnabled() && !newProps.equals(prop)) {
+    if (!newProps.equals(prop)) {
       for (String key : prop.stringPropertyNames()) {
         if (!key.equals(newProps.get(key))) {
-          if (LOG.isDebugEnabled() && MetastoreConf.isPrintable(key)) {
+          if (MetastoreConf.isPrintable(key)) {
             // The jdbc connection url can contain sensitive information like 
username and password
             // which should be masked out before logging.
             String oldVal = prop.getProperty(key);
@@ -185,10 +185,10 @@ private static void logPropChanges(Properties newProps) {
               oldVal = MetaStoreServerUtils.anonymizeConnectionURL(oldVal);
               newVal = MetaStoreServerUtils.anonymizeConnectionURL(newVal);
             }
-            LOG.debug("Found {} to be different. Old val : {} : New Val : {}", 
key,
+            LOG.warn("Found {} to be different. Old val : {} : New Val : {}", 
key,
                 oldVal, newVal);
           } else {
-            LOG.debug("Found masked property {} to be different", key);
+            LOG.warn("Found masked property {} to be different", key);
           }
         }
       }
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaderElection.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaderElection.java
index 5a6ab5d77bb..88d463e2abb 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaderElection.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaderElection.java
@@ -44,49 +44,49 @@ public interface LeaderElection<T> extends Closeable {
    *              It can be a path in Zookeeper, or a table that going to be 
locked.
    * @throws LeaderException
    */
-  public void tryBeLeader(Configuration conf, T mutex)
+  void tryBeLeader(Configuration conf, T mutex)
       throws LeaderException;
 
   /**
    * Getting the result of election.
    * @return true if wins the election, false otherwise.
    */
-  public boolean isLeader();
+  boolean isLeader();
 
   /**
    * Register listeners to get notified when leadership changes.
    * @param listener The listener to be added
    */
-  public void addStateListener(LeadershipStateListener listener);
+  void addStateListener(LeadershipStateListener listener);
 
   /**
    * Set the name of this leader candidate
    * @param name the name
    */
-  public void setName(String name);
+  void setName(String name);
 
   /**
    * Get the name of this leader candidate
    */
-  public String getName();
+  String getName();
 
   default boolean enforceMutex() {
     return true;
   }
 
-  public interface LeadershipStateListener {
+  interface LeadershipStateListener {
 
     /**
      * Invoked when won the election.
      * @param election the election where happens.
      */
-    public void takeLeadership(LeaderElection election) throws Exception;
+    void takeLeadership(LeaderElection election) throws Exception;
 
     /**
      * Invoked when lost the election
      * @param election the election where happens.
      */
-    public void lossLeadership(LeaderElection election) throws Exception;
+    void lossLeadership(LeaderElection election) throws Exception;
 
   }
 
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaderElectionContext.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaderElectionContext.java
index a3652d1c001..14d18e832db 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaderElectionContext.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaderElectionContext.java
@@ -29,6 +29,7 @@
 import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -44,8 +45,6 @@ public class LeaderElectionContext {
   public enum TTYPE {
     HOUSEKEEPING(new TableName(Warehouse.DEFAULT_CATALOG_NAME, 
"__METASTORE_LEADER_ELECTION__",
         "metastore_housekeeping"), "housekeeping"),
-    WORKER(new TableName(Warehouse.DEFAULT_CATALOG_NAME, 
"__METASTORE_LEADER_ELECTION__",
-        "metastore_compactor_worker"), "compactor_worker"),
     ALWAYS_TASKS(new TableName(Warehouse.DEFAULT_CATALOG_NAME, 
"__METASTORE_LEADER_ELECTION__",
         "metastore_always_tasks"), "always_tasks");
     // Mutex of TTYPE, which can be a nonexistent table
@@ -74,9 +73,8 @@ public String getName() {
   // State change listeners group by type
   private final Map<TTYPE, List<LeadershipStateListener>> listeners;
   // Collection of leader candidates
-  private final List<LeaderElection> leaderElections = new ArrayList<>();
+  private final List<LeaderElection<?>> leaderElections = new ArrayList<>();
   // Property for testing, a single leader will be created
-  public final static String LEADER_IN_TEST = 
"metastore.leader.election.in.test";
 
   private LeaderElectionContext(String servHost, Configuration conf,
       Map<TTYPE, List<LeadershipStateListener>> listeners,
@@ -97,40 +95,32 @@ private LeaderElectionContext(String servHost, 
Configuration conf,
   }
 
   public void start() throws Exception {
-    Map<TTYPE, List<LeadershipStateListener>> listenerMap = this.listeners;
-    if (conf.getBoolean(LEADER_IN_TEST, false)) {
-      Map<TTYPE, List<LeadershipStateListener>> newListeners = new HashMap<>();
-      newListeners.put(TTYPE.HOUSEKEEPING, new ArrayList<>());
-      listenerMap.forEach((k, v) -> 
newListeners.get(TTYPE.HOUSEKEEPING).addAll(v));
-      listenerMap = newListeners;
-    }
-    for (Map.Entry<TTYPE, List<LeadershipStateListener>> entry :
-        listenerMap.entrySet()) {
-      List<LeadershipStateListener> listenerList = entry.getValue();
+    List<TTYPE> ttypes = new ArrayList<>(listeners.keySet());
+    Collections.shuffle(ttypes);
+    for (TTYPE ttype : ttypes) {
+      List<LeadershipStateListener> listenerList = listeners.get(ttype);
       if (listenerList.isEmpty()) {
         continue;
       }
       if (auditLeaderListener != null) {
-        listenerList.add(0, auditLeaderListener);
+        listenerList.addFirst(auditLeaderListener);
       }
-      TTYPE ttype = entry.getKey();
       LeaderElection leaderElection = LeaderElectionFactory.create(conf);
       leaderElection.setName(ttype.name);
-      listenerList.forEach(listener -> 
leaderElection.addStateListener(listener));
+      listenerList.forEach(leaderElection::addStateListener);
       leaderElections.add(leaderElection);
-      
       Thread daemon = new Thread(() -> {
         try {
-          Object mutex = getLeaderMutex(conf, ttype, servHost);
+          Object mutex = LeaderElectionFactory.getMutex(conf, ttype, servHost);
           leaderElection.tryBeLeader(conf, mutex);
         } catch (LeaderException e) {
           throw new RuntimeException("Error claiming to be leader: " + 
leaderElection.getName(), e);
         }
       });
-      daemon.setName("Metastore Election " + leaderElection.getName());
-      daemon.setDaemon(true);
 
       if (startAsDaemon) {
+        daemon.setName("Metastore Election " + leaderElection.getName());
+        daemon.setDaemon(true);
         daemon.start();
       } else {
         daemon.run();
@@ -148,25 +138,6 @@ public void close() {
     });
   }
 
-  public static Object getLeaderMutex(Configuration conf, TTYPE ttype, String 
servHost) {
-    String method =
-        MetastoreConf.getVar(conf, 
MetastoreConf.ConfVars.METASTORE_HOUSEKEEPING_LEADER_ELECTION);
-    switch (method.toLowerCase()) {
-    case "host":
-      return servHost;
-    case "lock":
-      TableName mutex = ttype.getTableName();
-      String namespace =
-          MetastoreConf.getVar(conf, 
MetastoreConf.ConfVars.METASTORE_HOUSEKEEPING_LEADER_LOCK_NAMESPACE);
-      if (StringUtils.isNotEmpty(namespace)) {
-        return new TableName(mutex.getCat(), namespace, mutex.getTable());
-      }
-      return mutex;
-    default:
-      throw new UnsupportedOperationException(method + " not supported for 
leader election");
-    }
-  }
-
   public static class ContextBuilder {
     private Configuration configuration;
     private boolean startAsDaemon;
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaderElectionFactory.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaderElectionFactory.java
index 5055ad8a003..ab4ca8bc4fa 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaderElectionFactory.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaderElectionFactory.java
@@ -18,27 +18,78 @@
 
 package org.apache.hadoop.hive.metastore.leader;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.commons.lang3.EnumUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.TableName;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 
 /**
  * Simple factory for creating the elector
  */
 public class LeaderElectionFactory {
+  public enum Method {
+    HOST, LOCK
+  }
+
+  private static final Map<Method, ElectionCreator> ELECTION_CREATOR_MAP = new 
ConcurrentHashMap<>();
+  static {
+    addElectionCreator(Method.HOST, conf -> new StaticLeaderElection());
+    addElectionCreator(Method.LOCK, conf -> new LeaseLeaderElection());
+  }
+
+  private LeaderElectionFactory() {
+    throw new AssertionError("The constructor shouldn't be called");
+  }
+
+  public static LeaderElection create(Configuration conf) throws IOException {
+    String method =
+        MetastoreConf.getVar(conf, 
MetastoreConf.ConfVars.METASTORE_HOUSEKEEPING_LEADER_ELECTION);
+    Method m = EnumUtils.getEnum(Method.class, method.toUpperCase());
+    ElectionCreator creator = null;
+    if (m != null) {
+      creator = ELECTION_CREATOR_MAP.get(m);
+    }
+    if (creator == null) {
+      throw new UnsupportedOperationException(method + " not supported for 
electing the leader");
+    }
+    return creator.createElector(conf);
+  }
 
-  public static LeaderElection create(Configuration conf) throws IOException  {
+  public static Object getMutex(Configuration conf, 
LeaderElectionContext.TTYPE ttype, String servHost) {
     String method =
         MetastoreConf.getVar(conf, 
MetastoreConf.ConfVars.METASTORE_HOUSEKEEPING_LEADER_ELECTION);
-    switch (method.toLowerCase()) {
-      case "host":
-        return new StaticLeaderElection();
-      case "lock":
-        return new LeaseLeaderElection();
-      default:
-        throw new UnsupportedOperationException(method + " is not supported 
for electing the leader");
+    Method m = EnumUtils.getEnum(Method.class, method.toUpperCase());
+    if (m != null) {
+      switch (m) {
+      case HOST:
+        return servHost;
+      case LOCK:
+        TableName mutex = ttype.getTableName();
+        String namespace =
+            MetastoreConf.getVar(conf, 
MetastoreConf.ConfVars.METASTORE_HOUSEKEEPING_LEADER_LOCK_NAMESPACE);
+        if (StringUtils.isNotEmpty(namespace)) {
+          return new TableName(mutex.getCat(), namespace, mutex.getTable());
+        }
+        return mutex;
+      }
     }
+    throw new UnsupportedOperationException(method + " not supported for 
leader election");
+  }
+
+  @VisibleForTesting
+  public static void addElectionCreator(Method method, ElectionCreator 
creator) {
+    ELECTION_CREATOR_MAP.put(method, creator);
+  }
+
+  public interface ElectionCreator {
+    LeaderElection createElector(Configuration conf) throws IOException;
   }
 
 }
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaseLeaderElection.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaseLeaderElection.java
index fc4d4078df2..ee635a6f9f5 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaseLeaderElection.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaseLeaderElection.java
@@ -64,10 +64,10 @@ public class LeaseLeaderElection implements 
LeaderElection<TableName> {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(LeaseLeaderElection.class);
 
-  private static final AtomicLong ID = new AtomicLong();
+  protected static final AtomicLong ID = new AtomicLong();
 
   // Result of election
-  private volatile boolean isLeader;
+  protected volatile boolean isLeader;
 
   private TxnStore store;
 
@@ -76,7 +76,7 @@ public class LeaseLeaderElection implements 
LeaderElection<TableName> {
 
   // A daemon used for renewing the lock before timeout,
   // this happens when the current instance wins the election.
-  private LeaseWatcher heartbeater;
+  protected LeaseWatcher heartbeater;
 
   // For non-leader instances to check the lock periodically to
   // see if there is a chance to take over the leadership.
@@ -86,15 +86,14 @@ public class LeaseLeaderElection implements 
LeaderElection<TableName> {
   // Current lock id
   private volatile long lockId = -1;
 
-  // Leadership change listeners
-  private List<LeadershipStateListener> listeners = new ArrayList<>();
+  private volatile boolean stopped = false;
 
-  // Property for testing only
-  public static final String METASTORE_RENEW_LEASE = 
"metastore.renew.leader.lease";
+  // Leadership change listeners
+  private final List<LeadershipStateListener> listeners = new ArrayList<>();
 
-  private String name;
-  private String userName;
-  private String hostName;
+  protected String name;
+  private final String userName;
+  private final String hostName;
   private boolean enforceMutex;
 
   public LeaseLeaderElection() throws IOException {
@@ -111,10 +110,8 @@ private synchronized void doWork(LockResponse resp, 
Configuration conf,
 
     switch (resp.getState()) {
     case ACQUIRED:
-      boolean renewLease = conf.getBoolean(METASTORE_RENEW_LEASE, true);
-      heartbeater = renewLease ?
-          new Heartbeater(conf, tableName) : new 
ReleaseAndRequireWatcher(conf, tableName);
-      heartbeater.perform();
+      heartbeater = new Heartbeater(conf, tableName);
+      heartbeater.startWatch();
       if (!isLeader) {
         isLeader = true;
         notifyListener();
@@ -122,7 +119,7 @@ private synchronized void doWork(LockResponse resp, 
Configuration conf,
       break;
     case WAITING:
       nonLeaderWatcher = new NonLeaderWatcher(conf, tableName);
-      nonLeaderWatcher.perform();
+      nonLeaderWatcher.startWatch();
       if (isLeader) {
         isLeader = false;
         notifyListener();
@@ -134,7 +131,7 @@ private synchronized void doWork(LockResponse resp, 
Configuration conf,
     LOG.debug("Spent {}ms to notify the listeners, isLeader: {}", 
System.currentTimeMillis() - start, isLeader);
   }
 
-  private void notifyListener() {
+  protected void notifyListener() {
     listeners.forEach(listener -> {
       try {
         if (isLeader) {
@@ -143,8 +140,7 @@ private void notifyListener() {
           listener.lossLeadership(this);
         }
       } catch (Exception e) {
-        LOG.error("Error notifying the listener: " + listener +
-            ", leader: " + isLeader, e);
+        LOG.error("Error notifying the listener: {}, leader: {}", listener, 
isLeader, e);
       }
     });
   }
@@ -157,14 +153,15 @@ public void tryBeLeader(Configuration conf, TableName 
table) throws LeaderExcept
     if (store == null) {
       store = TxnUtils.getTxnStore(conf);
     }
-    LockComponent component = new LockComponentBuilder()
-        .setDbName(table.getDb())
-        .setTableName(table.getTable())
-        .setLock(LockType.EXCL_WRITE)
-        .setOperationType(DataOperationType.NO_TXN)
-        .build();
-    List<LockComponent> components = new ArrayList<LockComponent>(1);
-    components.add(component);
+    
+    List<LockComponent> components = new ArrayList<>();
+    components.add(
+        new LockComponentBuilder()
+            .setDbName(table.getDb())
+            .setTableName(table.getTable())
+            .setLock(LockType.EXCL_WRITE)
+            .setOperationType(DataOperationType.NO_TXN)
+            .build());
 
     boolean lockable = false;
     Exception recentException = null;
@@ -173,28 +170,27 @@ public void tryBeLeader(Configuration conf, TableName 
table) throws LeaderExcept
     int numRetries = MetastoreConf.getIntVar(conf, 
MetastoreConf.ConfVars.LOCK_NUMRETRIES);
     long maxSleep = MetastoreConf.getTimeVar(conf,
         MetastoreConf.ConfVars.LOCK_SLEEP_BETWEEN_RETRIES, 
TimeUnit.MILLISECONDS);
-    for (int i = 0; i < numRetries; i++) {
+    for (int i = 0; i < numRetries && !stopped; i++) {
       try {
         LockResponse res = store.lock(req);
         if (res.getState() == LockState.WAITING || res.getState() == 
LockState.ACQUIRED) {
           lockable = true;
+          LOG.debug("{} Spent {}ms to take part in election, retries: {}", 
getName(), System.currentTimeMillis() - start, i);
           doWork(res, conf, table);
-          LOG.debug("Spent {}ms to lock the table {}, retries: {}", 
System.currentTimeMillis() - start, table, i);
           break;
         }
       } catch (NoSuchTxnException | TxnAbortedException e) {
         throw new AssertionError("This should not happen, we didn't open txn", 
e);
       } catch (MetaException e) {
         recentException = e;
-        LOG.warn("Error while locking the table: {}, num retries: {}," +
-                " max retries: {}, exception: {}", table, i, numRetries, e);
+        LOG.warn("Error while locking the table: {}, num retries: {}, max 
retries: {}",
+            table, i, numRetries, e);
       }
       backoff(maxSleep);
     }
     if (!lockable) {
       throw new LeaderException("Error locking the table: " + table + " in " + 
numRetries +
-          " retries, time spent: " + (System.currentTimeMillis() - start) + " 
ms",
-          recentException);
+          " retries, time spent: " + (System.currentTimeMillis() - start) + " 
ms", recentException);
     }
   }
 
@@ -206,11 +202,11 @@ private void backoff(long maxSleep) {
       nextSleep = maxSleep;
     try {
       Thread.sleep(nextSleep);
-    } catch (InterruptedException e) {
+    } catch (InterruptedException ignored) {
     }
   }
 
-  private void shutdownWatcher() {
+  protected void shutdownWatcher() {
     if (heartbeater != null) {
       heartbeater.shutDown();
       heartbeater = null;
@@ -232,7 +228,7 @@ public boolean isLeader() {
     return isLeader;
   }
 
-  private abstract class LeaseWatcher extends Thread {
+  protected abstract class LeaseWatcher extends Thread {
 
     protected Configuration conf;
 
@@ -240,17 +236,17 @@ private abstract class LeaseWatcher extends Thread {
 
     private volatile boolean stopped = false;
 
-    LeaseWatcher(Configuration conf, TableName tableName) {
+    protected LeaseWatcher(Configuration conf, TableName tableName) {
       this.conf = conf;
       this.tableName = tableName;
       setDaemon(true);
-      StringBuilder builder = new StringBuilder("Leader-Watcher-")
-          .append(name != null ? name : "")
+      StringBuilder builder = new StringBuilder("Lease-Watcher-")
+          .append(name != null ? name + "-" : "")
           .append(ID.incrementAndGet());
       setName(builder.toString());
     }
 
-    public void perform() {
+    public void startWatch() {
       LOG.info("Starting a watcher: {} for {}", getClass().getName(), name);
       start();
     }
@@ -275,11 +271,9 @@ public void shutDown() {
     }
 
     public void beforeRun() {
-      // do nothing
     }
 
     public void afterRun() {
-      // do nothing
     }
 
     public abstract void runInternal();
@@ -288,15 +282,15 @@ public void reclaim() {
       try {
         tryBeLeader(conf, tableName);
       } catch (Exception e) {
-        LOG.error("Error reclaiming the leader, will retry in next cycle", e);
+        LOG.error("Error reclaiming the lease, will retry in next cycle", e);
       }
     }
   }
 
   private class NonLeaderWatcher extends LeaseWatcher {
-    private long sleep;
+    private final long sleep;
     private int count;
-    private CheckLockRequest request;
+    private final CheckLockRequest request;
 
     NonLeaderWatcher(Configuration conf, TableName table) {
       super(conf, table);
@@ -348,8 +342,8 @@ public void afterRun() {
   }
 
   private class Heartbeater extends LeaseWatcher {
-    private HeartbeatRequest req;
-    private long heartbeatInterval;
+    private final HeartbeatRequest req;
+    private final long heartbeatInterval;
 
     Heartbeater(Configuration conf, TableName table) {
       super(conf, table);
@@ -389,7 +383,7 @@ public void runInternal() {
         reclaim();
       } catch (Exception e) {
         // Wait for next cycle.
-        LOG.warn("Heartbeat failed with exception: " + e.getMessage(), e);
+        LOG.warn("Heartbeat failed with exception: {}", e.getMessage(), e);
       }
     }
 
@@ -403,45 +397,9 @@ public void afterRun() {
     }
   }
 
-  // For testing purpose only, lock would become timeout and then acquire it 
again
-  private class ReleaseAndRequireWatcher extends LeaseWatcher {
-    long timeout;
-    public ReleaseAndRequireWatcher(Configuration conf,
-        TableName tableName) {
-      super(conf, tableName);
-      timeout = MetastoreConf.getTimeVar(conf,
-          MetastoreConf.ConfVars.TXN_TIMEOUT, TimeUnit.MILLISECONDS) + 3000;
-      setName("ReleaseAndRequireWatcher");
-    }
-
-    @Override
-    public void beforeRun() {
-      try {
-        Thread.sleep(timeout);
-      } catch (InterruptedException e) {
-        // ignore this
-      }
-    }
-
-    @Override
-    public void runInternal() {
-      shutDown();
-      // The timeout lock should be cleaned,
-      // sleep some time to let others take the chance to become the leader
-      try {
-        Thread.sleep(5000);
-      } catch (InterruptedException e) {
-        // ignore
-      }
-      // Acquire the lock again
-      conf = new Configuration(conf);
-      conf.setBoolean(METASTORE_RENEW_LEASE, true);
-      reclaim();
-    }
-  }
-
   @Override
   public void close() {
+    stopped = true;
     shutdownWatcher();
     if (isLeader) {
       isLeader = false;
@@ -454,15 +412,11 @@ public void close() {
       } catch (NoSuchLockException | TxnOpenException e) {
         // ignore
       } catch (Exception e) {
-        LOG.error("Error while unlocking: " + lockId, e);
+        LOG.error("Error while unlocking: {}", lockId, e);
       }
     }
   }
 
-  public long getLockId() {
-    return lockId;
-  }
-
   @Override
   public void setName(String name) {
     this.name = name;
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
index 13f32f64630..b77dd08601e 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
@@ -388,6 +388,7 @@ LockResponse lock(LockRequest rqst)
    * @throws TxnAbortedException
    * @throws MetaException
    */
+  @SqlRetry(lockInternally = true)
   @Transactional(POOL_TX)
   @RetrySemantics.SafeToRetry
   LockResponse checkLock(CheckLockRequest rqst)
@@ -425,7 +426,7 @@ void unlock(UnlockRequest rqst)
    * @throws TxnAbortedException
    * @throws MetaException
    */
-  @SqlRetry
+  @SqlRetry(lockInternally = true)
   @Transactional(POOL_TX)
   @RetrySemantics.SafeToRetry
   void heartbeat(HeartbeatRequest ids)
diff --git 
a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/leader/TestLeaderElection.java
 
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/leader/TestLeaderElection.java
index 59f7dbc8fd6..c3d24cd2dce 100644
--- 
a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/leader/TestLeaderElection.java
+++ 
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/leader/TestLeaderElection.java
@@ -20,16 +20,10 @@
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.common.TableName;
-import org.apache.hadoop.hive.metastore.api.UnlockRequest;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
-import org.apache.hadoop.hive.metastore.txn.TxnStore;
-import org.apache.hadoop.hive.metastore.txn.TxnUtils;
 import org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil;
 import org.junit.Test;
 
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -40,7 +34,7 @@ public class TestLeaderElection {
 
   @Test
   public void testConfigLeaderElection() throws Exception {
-    LeaderElection election = new StaticLeaderElection();
+    LeaderElection<String> election = new StaticLeaderElection();
     String leaderHost = "host1.work";
     Configuration configuration = MetastoreConf.newMetastoreConf();
     election.tryBeLeader(configuration, leaderHost);
@@ -87,9 +81,7 @@ public void testLeaseLeaderElection() throws Exception {
     MetastoreConf.setBoolVar(configuration, 
MetastoreConf.ConfVars.HIVE_IN_TEST, true);
     TestTxnDbUtil.setConfValues(configuration);
     TestTxnDbUtil.prepDb(configuration);
-    TxnStore txnStore = TxnUtils.getTxnStore(configuration);
 
-    configuration.setBoolean(LeaseLeaderElection.METASTORE_RENEW_LEASE, false);
     TableName mutex = new TableName("hive", "default", "leader_lease_ms");
     LeaseLeaderElection instance1 = new LeaseLeaderElection();
     AtomicBoolean flag1 = new AtomicBoolean(false);
@@ -98,7 +90,6 @@ public void testLeaseLeaderElection() throws Exception {
     // elect1 as a leader now
     assertTrue(flag1.get() && instance1.isLeader());
 
-    configuration.setBoolean(LeaseLeaderElection.METASTORE_RENEW_LEASE, true);
     LeaseLeaderElection instance2 = new LeaseLeaderElection();
     AtomicBoolean flag2 = new AtomicBoolean(false);
     instance2.addStateListener(new TestLeaderListener(flag2));
@@ -106,52 +97,14 @@ public void testLeaseLeaderElection() throws Exception {
     // instance2 should not be leader as elect1 holds the lease
     assertFalse(flag2.get() || instance2.isLeader());
 
-    ExecutorService service = Executors.newFixedThreadPool(4);
-    wait(service, flag1, flag2);
-    // now instance1 lease is timeout, the instance2 should be leader now
+    instance1.close();
+    synchronized (flag2) {
+      flag2.wait();
+    }
+    // now instance1 lease is released, the instance2 should be leader now
     assertTrue(instance2.isLeader() && flag2.get());
     assertFalse(flag1.get() || instance1.isLeader());
     assertTrue(flag2.get() && instance2.isLeader());
-
-    // remove leader's lease (instance2)
-    long lockId2 = instance2.getLockId();
-    txnStore.unlock(new UnlockRequest(lockId2));
-    wait(service, flag1, flag2);
-    assertFalse(flag2.get() || instance2.isLeader());
-    assertTrue(lockId2 > 0);
-    assertFalse(instance2.getLockId() == lockId2);
-
-    // remove leader's lease(instance1)
-    long lockId1 = instance1.getLockId();
-    txnStore.unlock(new UnlockRequest(lockId1));
-    wait(service, flag1, flag2);
-    assertFalse(lockId1 == instance1.getLockId());
-    assertTrue(lockId1 > 0);
-
-    for (int i = 0; i < 10; i++) {
-      assertFalse(flag1.get() || instance1.isLeader());
-      assertTrue(flag2.get() && instance2.isLeader());
-      Thread.sleep(1 * 1000);
-    }
-  }
-
-  private void wait(ExecutorService service, Object... obj) throws Exception {
-    Future[] fs = new Future[obj.length];
-    for (int i = 0; i < obj.length; i++) {
-      Object monitor = obj[i];
-      fs[i] = service.submit(() -> {
-        try {
-          synchronized (monitor) {
-            monitor.wait();
-          }
-        } catch (InterruptedException e) {
-          throw new RuntimeException(e);
-        }
-      });
-    }
-    for (Future f : fs) {
-      f.get();
-    }
   }
 
 }

Reply via email to