This is an automated email from the ASF dual-hosted git repository.
dengzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 2fdb83941c3 HIVE-29274: Flaky
TestMetastoreLeaseLeader#testHouseKeepingThreads (#6142)
2fdb83941c3 is described below
commit 2fdb83941c3194f7fec8aa900339e77f0b7c12b5
Author: dengzh <[email protected]>
AuthorDate: Mon Oct 27 20:37:31 2025 +0800
HIVE-29274: Flaky TestMetastoreLeaseLeader#testHouseKeepingThreads (#6142)
---
.../MetastoreHousekeepingLeaderTestBase.java | 218 ++++++++++++++++++++-
.../metastore/TestMetastoreHousekeepingLeader.java | 28 +--
...TestMetastoreHousekeepingLeaderEmptyConfig.java | 29 +--
.../TestMetastoreHousekeepingNonLeader.java | 37 +---
.../hive/metastore/TestMetastoreLeaseLeader.java | 74 +++----
.../metastore/TestMetastoreLeaseNonLeader.java | 50 +++--
.../hive/metastore/PersistenceManagerProvider.java | 8 +-
.../hive/metastore/leader/LeaderElection.java | 16 +-
.../metastore/leader/LeaderElectionContext.java | 51 ++---
.../metastore/leader/LeaderElectionFactory.java | 67 ++++++-
.../hive/metastore/leader/LeaseLeaderElection.java | 132 ++++---------
.../apache/hadoop/hive/metastore/txn/TxnStore.java | 3 +-
.../hive/metastore/leader/TestLeaderElection.java | 59 +-----
13 files changed, 419 insertions(+), 353 deletions(-)
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/MetastoreHousekeepingLeaderTestBase.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/MetastoreHousekeepingLeaderTestBase.java
index e8ec181a684..61c81c29af7 100644
---
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/MetastoreHousekeepingLeaderTestBase.java
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/MetastoreHousekeepingLeaderTestBase.java
@@ -18,10 +18,16 @@
package org.apache.hadoop.hive.metastore;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hive.common.TableName;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
+import org.apache.hadoop.hive.metastore.leader.LeaderElection;
+import org.apache.hadoop.hive.metastore.leader.LeaderElectionContext;
+import org.apache.hadoop.hive.metastore.leader.LeaderElectionFactory;
+import org.apache.hadoop.hive.metastore.leader.LeaseLeaderElection;
import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge;
import org.apache.hadoop.hive.ql.stats.StatsUpdaterThread;
import org.apache.hadoop.hive.ql.txn.compactor.Cleaner;
@@ -31,18 +37,23 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* Base class for HMS leader config testing.
*/
-class MetastoreHousekeepingLeaderTestBase {
+abstract class MetastoreHousekeepingLeaderTestBase {
private static final Logger LOG =
LoggerFactory.getLogger(MetastoreHousekeepingLeaderTestBase.class);
private static HiveMetaStoreClient client;
- protected static Configuration conf = MetastoreConf.newMetastoreConf();
+ protected Configuration conf;
private static Warehouse warehouse;
private static boolean isServerStarted = false;
private static int port;
@@ -54,12 +65,15 @@ class MetastoreHousekeepingLeaderTestBase {
static Map<String, Boolean> threadNames = new HashMap<>();
static Map<Class<? extends Thread>, Boolean> threadClasses = new HashMap<>();
- void internalSetup(final String leaderHostName, boolean configuredLeader)
throws Exception {
+ void setup(final String leaderHostName, Configuration configuration) throws
Exception {
+ this.conf = configuration;
MetaStoreTestUtils.setConfForStandloneMode(conf);
MetastoreConf.setVar(conf, ConfVars.THRIFT_BIND_HOST, "localhost");
- MetastoreConf.setVar(conf,
ConfVars.METASTORE_HOUSEKEEPING_LEADER_HOSTNAME, leaderHostName);
MetastoreConf.setVar(conf, ConfVars.METASTORE_HOUSEKEEPING_LEADER_ELECTION,
- configuredLeader ? "host" : "lock");
+ leaderHostName != null ? LeaderElectionFactory.Method.HOST.name() :
LeaderElectionFactory.Method.LOCK.name());
+ if (leaderHostName != null) {
+ MetastoreConf.setVar(conf,
ConfVars.METASTORE_HOUSEKEEPING_LEADER_HOSTNAME, leaderHostName);
+ }
MetastoreConf.setBoolVar(conf,
MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON, true);
MetastoreConf.setBoolVar(conf,
MetastoreConf.ConfVars.COMPACTOR_CLEANER_ON, true);
@@ -193,5 +207,199 @@ private void resetThreadStatus() {
threadNames.forEach((name, status) -> threadNames.put(name, false));
threadClasses.forEach((thread, status) -> threadClasses.put(thread,
false));
}
+
+ static class CombinedLeaderElector implements AutoCloseable {
+ List<Pair<TableName, LeaderElection<TableName>>> elections = new
ArrayList<>();
+ private final Configuration configuration;
+ private String name;
+
+ CombinedLeaderElector(Configuration conf) throws IOException {
+ this.configuration = conf;
+ for (LeaderElectionContext.TTYPE type :
LeaderElectionContext.TTYPE.values()) {
+ TableName table = type.getTableName();
+ elections.add(Pair.of(table, new LeaseLeaderElection()));
+ }
+ }
+
+ public void tryBeLeader() throws Exception {
+ int i = 0;
+ for (Pair<TableName, LeaderElection<TableName>> election : elections) {
+ LeaderElection<TableName> le = election.getRight();
+ le.setName(name + "-" + i++);
+ le.tryBeLeader(configuration, election.getLeft());
+ }
+ }
+
+ public boolean isLeader() {
+ boolean isLeader = true;
+ for (Pair<TableName, LeaderElection<TableName>> election : elections) {
+ isLeader &= election.getRight().isLeader();
+ }
+ return isLeader;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public void close() throws Exception {
+ for (Pair<TableName, LeaderElection<TableName>> election : elections) {
+ election.getRight().close();
+ }
+ }
+ }
+
+ static class ReleaseAndRequireLease extends LeaseLeaderElection {
+ private static CountDownLatch latch;
+ private final Configuration configuration;
+ private final boolean needRenewLease;
+ private TableName tableName;
+
+ public static void setMonitor(CountDownLatch latch) {
+ ReleaseAndRequireLease.latch = latch;
+ }
+ public static void reset() {
+ ReleaseAndRequireLease.latch = null;
+ }
+
+ public ReleaseAndRequireLease(Configuration conf, boolean needRenewLease)
throws IOException {
+ super();
+ this.configuration = conf;
+ this.needRenewLease = needRenewLease;
+ }
+
+ @Override
+ public void setName(String name) {
+ super.setName(name);
+ LeaderElectionContext.TTYPE type = null;
+ for (LeaderElectionContext.TTYPE value :
LeaderElectionContext.TTYPE.values()) {
+ if (value.getName().equalsIgnoreCase(name)) {
+ type = value;
+ break;
+ }
+ }
+ if (type == null) {
+ // This shouldn't happen at all
+ throw new AssertionError("Unknown elector name: " + name);
+ }
+ this.tableName = type.getTableName();
+ }
+
+ @Override
+ protected void notifyListener() {
+ ScheduledExecutorService service = null;
+ if (!isLeader) {
+ try {
+ service = ThreadPool.getPool();
+ } catch (Exception ignored) {
+ }
+ }
+ super.notifyListener();
+ if (isLeader) {
+ if (!needRenewLease) {
+ super.shutdownWatcher();
+ // In our tests, the time spent on notifying the listener might be
greater than the lease timeout,
+ // which makes the leader loss the leadership quickly after wake up,
and kill all housekeeping services.
+ // Make sure the leader is still valid while notifying the listener,
and switch to ReleaseAndRequireWatcher
+ // after all listeners finish their work.
+ heartbeater = new ReleaseAndRequireWatcher(configuration, tableName);
+ heartbeater.startWatch();
+ }
+ } else {
+ if (service != null) {
+ // If the housekeeping task is running behind
+ Assert.assertTrue(service.isShutdown());
+ // Interrupt all sleeping tasks
+ service.shutdownNow();
+ try {
+ // This is the last one get notified, sleep some time to make sure
all other
+ // services have been stopped before return
+ Thread.sleep(12000);
+ } catch (InterruptedException ignore) {
+ }
+ }
+ }
+ if (latch != null) {
+ latch.countDown();
+ }
+ }
+
+ // For testing purpose only, lock would become timeout and then acquire it
again
+ private class ReleaseAndRequireWatcher extends LeaseWatcher {
+ long timeout;
+ public ReleaseAndRequireWatcher(Configuration conf,
+ TableName tableName) {
+ super(conf, tableName);
+ timeout = MetastoreConf.getTimeVar(conf,
+ MetastoreConf.ConfVars.TXN_TIMEOUT, TimeUnit.MILLISECONDS) + 3000;
+ setName("ReleaseAndRequireWatcher-" + ((name != null) ? name + "-" :
"") + ID.incrementAndGet());
+ }
+
+ @Override
+ public void beforeRun() {
+ try {
+ Thread.sleep(timeout);
+ } catch (InterruptedException e) {
+ // ignore this
+ }
+ }
+
+ @Override
+ public void runInternal() {
+ shutDown();
+ // The timeout lock should be cleaned,
+ // sleep some time to let others take the chance to become the leader
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ // Acquire the lock again
+ conf = new Configuration(conf);
+ reclaim();
+ }
+ }
+ }
+
+ public void checkHouseKeepingThreadExistence(boolean isLeader) throws
Exception {
+ searchHousekeepingThreads();
+
+ // Verify existence of threads
+ for (Map.Entry<String, Boolean> entry : threadNames.entrySet()) {
+ if (entry.getValue()) {
+ LOG.info("Found thread with name {}", entry.getKey());
+ } else {
+ LOG.info("No thread found with name {}", entry.getKey());
+ }
+ if (isLeader) {
+ Assert.assertTrue("No thread with name " + entry.getKey() + " found.",
entry.getValue());
+ } else {
+ Assert.assertFalse("Thread with name " + entry.getKey() + " found.",
entry.getValue());
+ }
+ }
+
+ for (Map.Entry<Class<? extends Thread>, Boolean> entry :
threadClasses.entrySet()) {
+ if (isLeader) {
+ if (entry.getValue()) {
+ LOG.info("Found thread for {}", entry.getKey().getSimpleName());
+ }
+ Assert.assertTrue("No thread found for class " +
entry.getKey().getSimpleName(), entry.getValue());
+ } else {
+ // A non-leader HMS will still run the configured number of Compaction
worker threads.
+ if (entry.getKey() == Worker.class) {
+ if (entry.getValue()) {
+ LOG.info("Thread found for " + entry.getKey().getSimpleName());
+ }
+ } else {
+ if (!entry.getValue()) {
+ LOG.info("No thread found for " + entry.getKey().getSimpleName());
+ }
+ Assert.assertFalse("Thread found for class " +
entry.getKey().getSimpleName(),
+ entry.getValue());
+ }
+ }
+ }
+ }
}
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetastoreHousekeepingLeader.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetastoreHousekeepingLeader.java
index 3df58ed2919..ae7cf125759 100644
---
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetastoreHousekeepingLeader.java
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetastoreHousekeepingLeader.java
@@ -18,44 +18,24 @@
package org.apache.hadoop.hive.metastore;
-import org.junit.Assert;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.junit.Before;
import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
/**
* Test for specifying a valid hostname as HMS leader.
*/
public class TestMetastoreHousekeepingLeader extends
MetastoreHousekeepingLeaderTestBase {
- private static final Logger LOG =
LoggerFactory.getLogger(TestMetastoreHousekeepingLeader.class);
@Before
public void setUp() throws Exception {
- internalSetup("localhost", true);
+ setup("localhost", MetastoreConf.newMetastoreConf());
}
@Test
public void testHouseKeepingThreadExistence() throws Exception {
- searchHousekeepingThreads();
-
- // Verify existence of threads
- for (Map.Entry<String, Boolean> entry : threadNames.entrySet()) {
- if (entry.getValue()) {
- LOG.info("Found thread with name " + entry.getKey());
- }
- Assert.assertTrue("No thread with name " + entry.getKey() + " found.",
entry.getValue());
- }
-
- for (Map.Entry<Class<? extends Thread>, Boolean> entry :
threadClasses.entrySet()) {
- if (entry.getValue()) {
- LOG.info("Found thread for " + entry.getKey().getSimpleName());
- }
- Assert.assertTrue("No thread found for class " +
entry.getKey().getSimpleName(),
- entry.getValue());
- }
+ checkHouseKeepingThreadExistence(true);
}
+
}
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetastoreHousekeepingLeaderEmptyConfig.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetastoreHousekeepingLeaderEmptyConfig.java
index 7f05902cc8f..69f01a85c2c 100644
---
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetastoreHousekeepingLeaderEmptyConfig.java
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetastoreHousekeepingLeaderEmptyConfig.java
@@ -18,45 +18,24 @@
package org.apache.hadoop.hive.metastore;
-import org.junit.Assert;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.junit.Before;
import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
/**
* Test for specifying empty HMS leader.
*/
public class TestMetastoreHousekeepingLeaderEmptyConfig extends
MetastoreHousekeepingLeaderTestBase {
- private static final Logger LOG =
-
LoggerFactory.getLogger(TestMetastoreHousekeepingLeaderEmptyConfig.class);
@Before
public void setUp() throws Exception {
// Empty string for leader indicates that the HMS is leader.
- internalSetup("", true);
+ setup("", MetastoreConf.newMetastoreConf());
}
@Test
public void testHouseKeepingThreadExistence() throws Exception {
- searchHousekeepingThreads();
-
- // Verify existence of threads
- for (Map.Entry<String, Boolean> entry : threadNames.entrySet()) {
- if (entry.getValue()) {
- LOG.info("Found thread with name " + entry.getKey());
- }
- Assert.assertTrue("No thread with name " + entry.getKey() + " found.",
entry.getValue());
- }
-
- for (Map.Entry<Class<? extends Thread>, Boolean> entry :
threadClasses.entrySet()) {
- if (entry.getValue()) {
- LOG.info("Found thread for " + entry.getKey().getSimpleName());
- }
- Assert.assertTrue("No thread found for class " +
entry.getKey().getSimpleName(),
- entry.getValue());
- }
+ checkHouseKeepingThreadExistence(true);
}
+
}
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetastoreHousekeepingNonLeader.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetastoreHousekeepingNonLeader.java
index 4a0212d6860..1a113c9cc5c 100644
---
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetastoreHousekeepingNonLeader.java
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetastoreHousekeepingNonLeader.java
@@ -18,53 +18,24 @@
package org.apache.hadoop.hive.metastore;
-import org.apache.hadoop.hive.ql.txn.compactor.Worker;
-import org.junit.Assert;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.junit.Before;
import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
/**
* Test for specifying HMS leader other than the current one.
*/
public class TestMetastoreHousekeepingNonLeader extends
MetastoreHousekeepingLeaderTestBase {
- private static final Logger LOG =
-
LoggerFactory.getLogger(TestMetastoreHousekeepingLeaderEmptyConfig.class);
@Before
public void setUp() throws Exception {
// Empty string for leader indicates that the HMS is leader.
- internalSetup("some_non_leader_host.domain1.domain", true);
+ setup("some_non_leader_host.domain1.domain",
MetastoreConf.newMetastoreConf());
}
@Test
public void testHouseKeepingThreadExistence() throws Exception {
- searchHousekeepingThreads();
-
- // Verify existence of threads
- for (Map.Entry<String, Boolean> entry : threadNames.entrySet()) {
- if (!entry.getValue()) {
- LOG.info("No thread found with name " + entry.getKey());
- }
- Assert.assertFalse("Thread with name " + entry.getKey() + " found.",
entry.getValue());
- }
-
- for (Map.Entry<Class<? extends Thread>, Boolean> entry :
threadClasses.entrySet()) {
- // A non-leader HMS will still run the configured number of Compaction
worker threads.
- if (entry.getKey() == Worker.class) {
- if (entry.getValue()) {
- LOG.info("Thread found for " + entry.getKey().getSimpleName());
- }
- } else {
- if (!entry.getValue()) {
- LOG.info("No thread found for " + entry.getKey().getSimpleName());
- }
- Assert.assertFalse("Thread found for class " +
entry.getKey().getSimpleName(),
- entry.getValue());
- }
- }
+ checkHouseKeepingThreadExistence(false);
}
+
}
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetastoreLeaseLeader.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetastoreLeaseLeader.java
index 88231b3ebba..8e570c9b389 100644
---
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetastoreLeaseLeader.java
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetastoreLeaseLeader.java
@@ -19,65 +19,65 @@
package org.apache.hadoop.hive.metastore;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.common.TableName;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
-import org.apache.hadoop.hive.metastore.leader.LeaderElection;
import org.apache.hadoop.hive.metastore.leader.LeaderElectionContext;
-import org.apache.hadoop.hive.metastore.leader.LeaseLeaderElection;
+import org.apache.hadoop.hive.metastore.leader.LeaderElectionFactory;
+import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
-public class TestMetastoreLeaseLeader {
+public class TestMetastoreLeaseLeader extends
MetastoreHousekeepingLeaderTestBase {
- LeaderElection election;
-
- TestMetastoreHousekeepingLeader hms;
+ CombinedLeaderElector elector;
@Before
public void setUp() throws Exception {
- hms = new TestMetastoreHousekeepingLeader();
- MetastoreConf.setTimeVar(hms.conf, MetastoreConf.ConfVars.TXN_TIMEOUT, 3,
TimeUnit.SECONDS);
- MetastoreConf.setTimeVar(hms.conf,
MetastoreConf.ConfVars.LOCK_SLEEP_BETWEEN_RETRIES, 1, TimeUnit.SECONDS);
- hms.conf.setBoolean(LeaseLeaderElection.METASTORE_RENEW_LEASE, false);
- hms.conf.setBoolean(LeaderElectionContext.LEADER_IN_TEST, true);
- hms.conf.set("hive.txn.manager",
"org.apache.hadoop.hive.ql.lockmgr.DbTxnManager");
- hms.internalSetup("", false);
+ Configuration configuration = MetastoreConf.newMetastoreConf();
+ MetastoreConf.setTimeVar(configuration,
MetastoreConf.ConfVars.TXN_TIMEOUT, 3, TimeUnit.SECONDS);
+ MetastoreConf.setTimeVar(configuration,
MetastoreConf.ConfVars.LOCK_SLEEP_BETWEEN_RETRIES, 200, TimeUnit.MILLISECONDS);
+ MetastoreConf.setLongVar(configuration,
MetastoreConf.ConfVars.HMS_HANDLER_ATTEMPTS, 3);
+ MetastoreConf.setTimeVar(configuration,
MetastoreConf.ConfVars.HMS_HANDLER_INTERVAL, 100, TimeUnit.MILLISECONDS);
+ configuration.set("hive.txn.manager",
"org.apache.hadoop.hive.ql.lockmgr.DbTxnManager");
+
LeaderElectionFactory.addElectionCreator(LeaderElectionFactory.Method.LOCK,
conf -> new ReleaseAndRequireLease(conf, false));
+ setup(null, configuration);
- Configuration conf = MetastoreConf.newMetastoreConf();
- MetastoreConf.setTimeVar(conf,
MetastoreConf.ConfVars.LOCK_SLEEP_BETWEEN_RETRIES, 1, TimeUnit.SECONDS);
- MetastoreConf.setTimeVar(conf, MetastoreConf.ConfVars.TXN_TIMEOUT, 3,
TimeUnit.SECONDS);
- MetastoreConf.setVar(conf,
MetastoreConf.ConfVars.METASTORE_HOUSEKEEPING_LEADER_ELECTION, "lock");
- election = new LeaseLeaderElection();
- TableName tableName = (TableName)
LeaderElectionContext.getLeaderMutex(conf,
- LeaderElectionContext.TTYPE.HOUSEKEEPING, null);
- election.tryBeLeader(conf, tableName);
+ Configuration conf = new Configuration(configuration);
+ elector = new CombinedLeaderElector(conf);
+ elector.setName("TestMetastoreLeaseLeader");
+ elector.tryBeLeader();
}
@Test
public void testHouseKeepingThreads() throws Exception {
+ int size = LeaderElectionContext.TTYPE.values().length;
+ CountDownLatch latch = new CountDownLatch(size);
+
MetastoreHousekeepingLeaderTestBase.ReleaseAndRequireLease.setMonitor(latch);
// hms is the leader now
- hms.testHouseKeepingThreadExistence();
- assertFalse(election.isLeader());
- Thread.sleep(15 * 1000);
- // the lease of hms is timeout, election becomes leader now
- assertTrue(election.isLeader());
- try {
- // hms should shutdown all housekeeping tasks
- hms.testHouseKeepingThreadExistence();
- throw new IllegalStateException("HMS should shutdown all housekeeping
tasks");
- } catch (AssertionError e) {
- // expected
- }
+ checkHouseKeepingThreadExistence(true);
+ assertFalse(elector.isLeader());
+ latch.await();
+ // the lease of hms is timeout, the elector becomes leader now
+ assertTrue(elector.isLeader());
+ // hms should shut down all housekeeping tasks
+ checkHouseKeepingThreadExistence(false);
- election.close();
- Thread.sleep(10000);
+ latch = new CountDownLatch(size);
+
MetastoreHousekeepingLeaderTestBase.ReleaseAndRequireLease.setMonitor(latch);
+ elector.close();
+ latch.await();
// hms becomes leader again
- hms.testHouseKeepingThreadExistence();
+ checkHouseKeepingThreadExistence(true);
+ }
+
+ @After
+ public void afterTest() {
+ MetastoreHousekeepingLeaderTestBase.ReleaseAndRequireLease.reset();
}
}
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetastoreLeaseNonLeader.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetastoreLeaseNonLeader.java
index 6a340e4e661..0873c129454 100644
---
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetastoreLeaseNonLeader.java
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetastoreLeaseNonLeader.java
@@ -19,56 +19,54 @@
package org.apache.hadoop.hive.metastore;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.common.TableName;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
-import org.apache.hadoop.hive.metastore.leader.LeaderElection;
import org.apache.hadoop.hive.metastore.leader.LeaderElectionContext;
-import org.apache.hadoop.hive.metastore.leader.LeaseLeaderElection;
+import org.apache.hadoop.hive.metastore.leader.LeaderElectionFactory;
import org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil;
+import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertTrue;
-public class TestMetastoreLeaseNonLeader {
+public class TestMetastoreLeaseNonLeader extends
MetastoreHousekeepingLeaderTestBase {
- LeaderElection election;
-
- TestMetastoreHousekeepingLeader hms;
+ CombinedLeaderElector elector;
@Before
public void setUp() throws Exception {
Configuration conf = MetastoreConf.newMetastoreConf();
TestTxnDbUtil.setConfValues(conf);
TestTxnDbUtil.prepDb(conf);
- election = new LeaseLeaderElection();
- MetastoreConf.setVar(conf,
MetastoreConf.ConfVars.METASTORE_HOUSEKEEPING_LEADER_ELECTION, "lock");
- TableName tableName = (TableName)
LeaderElectionContext.getLeaderMutex(conf,
- LeaderElectionContext.TTYPE.HOUSEKEEPING, null);
- election.tryBeLeader(conf, tableName);
- assertTrue("The elector should hold the lease now", election.isLeader());
+ elector = new CombinedLeaderElector(conf);
+ elector.setName("TestMetastoreLeaseNonLeader");
+ elector.tryBeLeader();
+ assertTrue("The elector should hold the lease now", elector.isLeader());
// start the non-leader hms now
- hms = new TestMetastoreHousekeepingLeader();
- MetastoreConf.setTimeVar(hms.conf,
MetastoreConf.ConfVars.LOCK_SLEEP_BETWEEN_RETRIES, 1, TimeUnit.SECONDS);
- hms.conf.setBoolean(LeaderElectionContext.LEADER_IN_TEST, true);
- hms.internalSetup("", false);
+ Configuration configuration = new Configuration(conf);
+ MetastoreConf.setTimeVar(configuration,
MetastoreConf.ConfVars.LOCK_SLEEP_BETWEEN_RETRIES, 100, TimeUnit.MILLISECONDS);
+
LeaderElectionFactory.addElectionCreator(LeaderElectionFactory.Method.LOCK, c
-> new ReleaseAndRequireLease(c, true));
+ setup(null, configuration);
}
@Test
public void testHouseKeepingThreads() throws Exception {
- try {
- hms.testHouseKeepingThreadExistence();
- throw new IllegalStateException("HMS shouldn't start any housekeeping
tasks");
- } catch (AssertionError e) {
- // expected
- }
+ checkHouseKeepingThreadExistence(false);
// elector releases the lease
- election.close();
- Thread.sleep(10 * 1000);
+ CountDownLatch latch = new
CountDownLatch(LeaderElectionContext.TTYPE.values().length);
+
MetastoreHousekeepingLeaderTestBase.ReleaseAndRequireLease.setMonitor(latch);
+ elector.close();
+ latch.await();
// housing threads are here now as the hms wins the election
- hms.testHouseKeepingThreadExistence();
+ checkHouseKeepingThreadExistence(true);
+ }
+
+ @After
+ public void afterTest() {
+ MetastoreHousekeepingLeaderTestBase.ReleaseAndRequireLease.reset();
}
}
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PersistenceManagerProvider.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PersistenceManagerProvider.java
index bf99745cc7d..0ea4f54b297 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PersistenceManagerProvider.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PersistenceManagerProvider.java
@@ -173,10 +173,10 @@ private static void logPropChanges(Properties newProps) {
return;
}
LOG.info("Updating the pmf due to property change");
- if (LOG.isDebugEnabled() && !newProps.equals(prop)) {
+ if (!newProps.equals(prop)) {
for (String key : prop.stringPropertyNames()) {
if (!key.equals(newProps.get(key))) {
- if (LOG.isDebugEnabled() && MetastoreConf.isPrintable(key)) {
+ if (MetastoreConf.isPrintable(key)) {
// The jdbc connection url can contain sensitive information like
username and password
// which should be masked out before logging.
String oldVal = prop.getProperty(key);
@@ -185,10 +185,10 @@ private static void logPropChanges(Properties newProps) {
oldVal = MetaStoreServerUtils.anonymizeConnectionURL(oldVal);
newVal = MetaStoreServerUtils.anonymizeConnectionURL(newVal);
}
- LOG.debug("Found {} to be different. Old val : {} : New Val : {}",
key,
+ LOG.warn("Found {} to be different. Old val : {} : New Val : {}",
key,
oldVal, newVal);
} else {
- LOG.debug("Found masked property {} to be different", key);
+ LOG.warn("Found masked property {} to be different", key);
}
}
}
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaderElection.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaderElection.java
index 5a6ab5d77bb..88d463e2abb 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaderElection.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaderElection.java
@@ -44,49 +44,49 @@ public interface LeaderElection<T> extends Closeable {
* It can be a path in Zookeeper, or a table that going to be
locked.
* @throws LeaderException
*/
- public void tryBeLeader(Configuration conf, T mutex)
+ void tryBeLeader(Configuration conf, T mutex)
throws LeaderException;
/**
* Getting the result of election.
* @return true if wins the election, false otherwise.
*/
- public boolean isLeader();
+ boolean isLeader();
/**
* Register listeners to get notified when leadership changes.
* @param listener The listener to be added
*/
- public void addStateListener(LeadershipStateListener listener);
+ void addStateListener(LeadershipStateListener listener);
/**
* Set the name of this leader candidate
* @param name the name
*/
- public void setName(String name);
+ void setName(String name);
/**
* Get the name of this leader candidate
*/
- public String getName();
+ String getName();
default boolean enforceMutex() {
return true;
}
- public interface LeadershipStateListener {
+ interface LeadershipStateListener {
/**
* Invoked when won the election.
* @param election the election where happens.
*/
- public void takeLeadership(LeaderElection election) throws Exception;
+ void takeLeadership(LeaderElection election) throws Exception;
/**
* Invoked when lost the election
* @param election the election where happens.
*/
- public void lossLeadership(LeaderElection election) throws Exception;
+ void lossLeadership(LeaderElection election) throws Exception;
}
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaderElectionContext.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaderElectionContext.java
index a3652d1c001..14d18e832db 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaderElectionContext.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaderElectionContext.java
@@ -29,6 +29,7 @@
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -44,8 +45,6 @@ public class LeaderElectionContext {
public enum TTYPE {
HOUSEKEEPING(new TableName(Warehouse.DEFAULT_CATALOG_NAME,
"__METASTORE_LEADER_ELECTION__",
"metastore_housekeeping"), "housekeeping"),
- WORKER(new TableName(Warehouse.DEFAULT_CATALOG_NAME,
"__METASTORE_LEADER_ELECTION__",
- "metastore_compactor_worker"), "compactor_worker"),
ALWAYS_TASKS(new TableName(Warehouse.DEFAULT_CATALOG_NAME,
"__METASTORE_LEADER_ELECTION__",
"metastore_always_tasks"), "always_tasks");
// Mutex of TTYPE, which can be a nonexistent table
@@ -74,9 +73,8 @@ public String getName() {
// State change listeners group by type
private final Map<TTYPE, List<LeadershipStateListener>> listeners;
// Collection of leader candidates
- private final List<LeaderElection> leaderElections = new ArrayList<>();
+ private final List<LeaderElection<?>> leaderElections = new ArrayList<>();
// Property for testing, a single leader will be created
- public final static String LEADER_IN_TEST =
"metastore.leader.election.in.test";
private LeaderElectionContext(String servHost, Configuration conf,
Map<TTYPE, List<LeadershipStateListener>> listeners,
@@ -97,40 +95,32 @@ private LeaderElectionContext(String servHost,
Configuration conf,
}
public void start() throws Exception {
- Map<TTYPE, List<LeadershipStateListener>> listenerMap = this.listeners;
- if (conf.getBoolean(LEADER_IN_TEST, false)) {
- Map<TTYPE, List<LeadershipStateListener>> newListeners = new HashMap<>();
- newListeners.put(TTYPE.HOUSEKEEPING, new ArrayList<>());
- listenerMap.forEach((k, v) ->
newListeners.get(TTYPE.HOUSEKEEPING).addAll(v));
- listenerMap = newListeners;
- }
- for (Map.Entry<TTYPE, List<LeadershipStateListener>> entry :
- listenerMap.entrySet()) {
- List<LeadershipStateListener> listenerList = entry.getValue();
+ List<TTYPE> ttypes = new ArrayList<>(listeners.keySet());
+ Collections.shuffle(ttypes);
+ for (TTYPE ttype : ttypes) {
+ List<LeadershipStateListener> listenerList = listeners.get(ttype);
if (listenerList.isEmpty()) {
continue;
}
if (auditLeaderListener != null) {
- listenerList.add(0, auditLeaderListener);
+ listenerList.addFirst(auditLeaderListener);
}
- TTYPE ttype = entry.getKey();
LeaderElection leaderElection = LeaderElectionFactory.create(conf);
leaderElection.setName(ttype.name);
- listenerList.forEach(listener ->
leaderElection.addStateListener(listener));
+ listenerList.forEach(leaderElection::addStateListener);
leaderElections.add(leaderElection);
-
Thread daemon = new Thread(() -> {
try {
- Object mutex = getLeaderMutex(conf, ttype, servHost);
+ Object mutex = LeaderElectionFactory.getMutex(conf, ttype, servHost);
leaderElection.tryBeLeader(conf, mutex);
} catch (LeaderException e) {
throw new RuntimeException("Error claiming to be leader: " +
leaderElection.getName(), e);
}
});
- daemon.setName("Metastore Election " + leaderElection.getName());
- daemon.setDaemon(true);
if (startAsDaemon) {
+ daemon.setName("Metastore Election " + leaderElection.getName());
+ daemon.setDaemon(true);
daemon.start();
} else {
daemon.run();
@@ -148,25 +138,6 @@ public void close() {
});
}
- public static Object getLeaderMutex(Configuration conf, TTYPE ttype, String
servHost) {
- String method =
- MetastoreConf.getVar(conf,
MetastoreConf.ConfVars.METASTORE_HOUSEKEEPING_LEADER_ELECTION);
- switch (method.toLowerCase()) {
- case "host":
- return servHost;
- case "lock":
- TableName mutex = ttype.getTableName();
- String namespace =
- MetastoreConf.getVar(conf,
MetastoreConf.ConfVars.METASTORE_HOUSEKEEPING_LEADER_LOCK_NAMESPACE);
- if (StringUtils.isNotEmpty(namespace)) {
- return new TableName(mutex.getCat(), namespace, mutex.getTable());
- }
- return mutex;
- default:
- throw new UnsupportedOperationException(method + " not supported for
leader election");
- }
- }
-
public static class ContextBuilder {
private Configuration configuration;
private boolean startAsDaemon;
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaderElectionFactory.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaderElectionFactory.java
index 5055ad8a003..ab4ca8bc4fa 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaderElectionFactory.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaderElectionFactory.java
@@ -18,27 +18,78 @@
package org.apache.hadoop.hive.metastore.leader;
+import com.google.common.annotations.VisibleForTesting;
+
import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.commons.lang3.EnumUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.TableName;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
/**
* Simple factory for creating the elector
*/
public class LeaderElectionFactory {
+ public enum Method {
+ HOST, LOCK
+ }
+
+ private static final Map<Method, ElectionCreator> ELECTION_CREATOR_MAP = new
ConcurrentHashMap<>();
+ static {
+ addElectionCreator(Method.HOST, conf -> new StaticLeaderElection());
+ addElectionCreator(Method.LOCK, conf -> new LeaseLeaderElection());
+ }
+
+ private LeaderElectionFactory() {
+ throw new AssertionError("The constructor shouldn't be called");
+ }
+
+ public static LeaderElection create(Configuration conf) throws IOException {
+ String method =
+ MetastoreConf.getVar(conf,
MetastoreConf.ConfVars.METASTORE_HOUSEKEEPING_LEADER_ELECTION);
+ Method m = EnumUtils.getEnum(Method.class, method.toUpperCase());
+ ElectionCreator creator = null;
+ if (m != null) {
+ creator = ELECTION_CREATOR_MAP.get(m);
+ }
+ if (creator == null) {
+ throw new UnsupportedOperationException(method + " not supported for
electing the leader");
+ }
+ return creator.createElector(conf);
+ }
- public static LeaderElection create(Configuration conf) throws IOException {
+ public static Object getMutex(Configuration conf,
LeaderElectionContext.TTYPE ttype, String servHost) {
String method =
MetastoreConf.getVar(conf,
MetastoreConf.ConfVars.METASTORE_HOUSEKEEPING_LEADER_ELECTION);
- switch (method.toLowerCase()) {
- case "host":
- return new StaticLeaderElection();
- case "lock":
- return new LeaseLeaderElection();
- default:
- throw new UnsupportedOperationException(method + " is not supported
for electing the leader");
+ Method m = EnumUtils.getEnum(Method.class, method.toUpperCase());
+ if (m != null) {
+ switch (m) {
+ case HOST:
+ return servHost;
+ case LOCK:
+ TableName mutex = ttype.getTableName();
+ String namespace =
+ MetastoreConf.getVar(conf,
MetastoreConf.ConfVars.METASTORE_HOUSEKEEPING_LEADER_LOCK_NAMESPACE);
+ if (StringUtils.isNotEmpty(namespace)) {
+ return new TableName(mutex.getCat(), namespace, mutex.getTable());
+ }
+ return mutex;
+ }
}
+ throw new UnsupportedOperationException(method + " not supported for
leader election");
+ }
+
+ @VisibleForTesting
+ public static void addElectionCreator(Method method, ElectionCreator
creator) {
+ ELECTION_CREATOR_MAP.put(method, creator);
+ }
+
+ public interface ElectionCreator {
+ LeaderElection createElector(Configuration conf) throws IOException;
}
}
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaseLeaderElection.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaseLeaderElection.java
index fc4d4078df2..ee635a6f9f5 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaseLeaderElection.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaseLeaderElection.java
@@ -64,10 +64,10 @@ public class LeaseLeaderElection implements
LeaderElection<TableName> {
private static final Logger LOG =
LoggerFactory.getLogger(LeaseLeaderElection.class);
- private static final AtomicLong ID = new AtomicLong();
+ protected static final AtomicLong ID = new AtomicLong();
// Result of election
- private volatile boolean isLeader;
+ protected volatile boolean isLeader;
private TxnStore store;
@@ -76,7 +76,7 @@ public class LeaseLeaderElection implements
LeaderElection<TableName> {
// A daemon used for renewing the lock before timeout,
// this happens when the current instance wins the election.
- private LeaseWatcher heartbeater;
+ protected LeaseWatcher heartbeater;
// For non-leader instances to check the lock periodically to
// see if there is a chance to take over the leadership.
@@ -86,15 +86,14 @@ public class LeaseLeaderElection implements
LeaderElection<TableName> {
// Current lock id
private volatile long lockId = -1;
- // Leadership change listeners
- private List<LeadershipStateListener> listeners = new ArrayList<>();
+ private volatile boolean stopped = false;
- // Property for testing only
- public static final String METASTORE_RENEW_LEASE =
"metastore.renew.leader.lease";
+ // Leadership change listeners
+ private final List<LeadershipStateListener> listeners = new ArrayList<>();
- private String name;
- private String userName;
- private String hostName;
+ protected String name;
+ private final String userName;
+ private final String hostName;
private boolean enforceMutex;
public LeaseLeaderElection() throws IOException {
@@ -111,10 +110,8 @@ private synchronized void doWork(LockResponse resp,
Configuration conf,
switch (resp.getState()) {
case ACQUIRED:
- boolean renewLease = conf.getBoolean(METASTORE_RENEW_LEASE, true);
- heartbeater = renewLease ?
- new Heartbeater(conf, tableName) : new
ReleaseAndRequireWatcher(conf, tableName);
- heartbeater.perform();
+ heartbeater = new Heartbeater(conf, tableName);
+ heartbeater.startWatch();
if (!isLeader) {
isLeader = true;
notifyListener();
@@ -122,7 +119,7 @@ private synchronized void doWork(LockResponse resp,
Configuration conf,
break;
case WAITING:
nonLeaderWatcher = new NonLeaderWatcher(conf, tableName);
- nonLeaderWatcher.perform();
+ nonLeaderWatcher.startWatch();
if (isLeader) {
isLeader = false;
notifyListener();
@@ -134,7 +131,7 @@ private synchronized void doWork(LockResponse resp,
Configuration conf,
LOG.debug("Spent {}ms to notify the listeners, isLeader: {}",
System.currentTimeMillis() - start, isLeader);
}
- private void notifyListener() {
+ protected void notifyListener() {
listeners.forEach(listener -> {
try {
if (isLeader) {
@@ -143,8 +140,7 @@ private void notifyListener() {
listener.lossLeadership(this);
}
} catch (Exception e) {
- LOG.error("Error notifying the listener: " + listener +
- ", leader: " + isLeader, e);
+ LOG.error("Error notifying the listener: {}, leader: {}", listener,
isLeader, e);
}
});
}
@@ -157,14 +153,15 @@ public void tryBeLeader(Configuration conf, TableName
table) throws LeaderExcept
if (store == null) {
store = TxnUtils.getTxnStore(conf);
}
- LockComponent component = new LockComponentBuilder()
- .setDbName(table.getDb())
- .setTableName(table.getTable())
- .setLock(LockType.EXCL_WRITE)
- .setOperationType(DataOperationType.NO_TXN)
- .build();
- List<LockComponent> components = new ArrayList<LockComponent>(1);
- components.add(component);
+
+ List<LockComponent> components = new ArrayList<>();
+ components.add(
+ new LockComponentBuilder()
+ .setDbName(table.getDb())
+ .setTableName(table.getTable())
+ .setLock(LockType.EXCL_WRITE)
+ .setOperationType(DataOperationType.NO_TXN)
+ .build());
boolean lockable = false;
Exception recentException = null;
@@ -173,28 +170,27 @@ public void tryBeLeader(Configuration conf, TableName
table) throws LeaderExcept
int numRetries = MetastoreConf.getIntVar(conf,
MetastoreConf.ConfVars.LOCK_NUMRETRIES);
long maxSleep = MetastoreConf.getTimeVar(conf,
MetastoreConf.ConfVars.LOCK_SLEEP_BETWEEN_RETRIES,
TimeUnit.MILLISECONDS);
- for (int i = 0; i < numRetries; i++) {
+ for (int i = 0; i < numRetries && !stopped; i++) {
try {
LockResponse res = store.lock(req);
if (res.getState() == LockState.WAITING || res.getState() ==
LockState.ACQUIRED) {
lockable = true;
+ LOG.debug("{} Spent {}ms to take part in election, retries: {}",
getName(), System.currentTimeMillis() - start, i);
doWork(res, conf, table);
- LOG.debug("Spent {}ms to lock the table {}, retries: {}",
System.currentTimeMillis() - start, table, i);
break;
}
} catch (NoSuchTxnException | TxnAbortedException e) {
throw new AssertionError("This should not happen, we didn't open txn",
e);
} catch (MetaException e) {
recentException = e;
- LOG.warn("Error while locking the table: {}, num retries: {}," +
- " max retries: {}, exception: {}", table, i, numRetries, e);
+ LOG.warn("Error while locking the table: {}, num retries: {}, max
retries: {}",
+ table, i, numRetries, e);
}
backoff(maxSleep);
}
if (!lockable) {
throw new LeaderException("Error locking the table: " + table + " in " +
numRetries +
- " retries, time spent: " + (System.currentTimeMillis() - start) + "
ms",
- recentException);
+ " retries, time spent: " + (System.currentTimeMillis() - start) + "
ms", recentException);
}
}
@@ -206,11 +202,11 @@ private void backoff(long maxSleep) {
nextSleep = maxSleep;
try {
Thread.sleep(nextSleep);
- } catch (InterruptedException e) {
+ } catch (InterruptedException ignored) {
}
}
- private void shutdownWatcher() {
+ protected void shutdownWatcher() {
if (heartbeater != null) {
heartbeater.shutDown();
heartbeater = null;
@@ -232,7 +228,7 @@ public boolean isLeader() {
return isLeader;
}
- private abstract class LeaseWatcher extends Thread {
+ protected abstract class LeaseWatcher extends Thread {
protected Configuration conf;
@@ -240,17 +236,17 @@ private abstract class LeaseWatcher extends Thread {
private volatile boolean stopped = false;
- LeaseWatcher(Configuration conf, TableName tableName) {
+ protected LeaseWatcher(Configuration conf, TableName tableName) {
this.conf = conf;
this.tableName = tableName;
setDaemon(true);
- StringBuilder builder = new StringBuilder("Leader-Watcher-")
- .append(name != null ? name : "")
+ StringBuilder builder = new StringBuilder("Lease-Watcher-")
+ .append(name != null ? name + "-" : "")
.append(ID.incrementAndGet());
setName(builder.toString());
}
- public void perform() {
+ public void startWatch() {
LOG.info("Starting a watcher: {} for {}", getClass().getName(), name);
start();
}
@@ -275,11 +271,9 @@ public void shutDown() {
}
public void beforeRun() {
- // do nothing
}
public void afterRun() {
- // do nothing
}
public abstract void runInternal();
@@ -288,15 +282,15 @@ public void reclaim() {
try {
tryBeLeader(conf, tableName);
} catch (Exception e) {
- LOG.error("Error reclaiming the leader, will retry in next cycle", e);
+ LOG.error("Error reclaiming the lease, will retry in next cycle", e);
}
}
}
private class NonLeaderWatcher extends LeaseWatcher {
- private long sleep;
+ private final long sleep;
private int count;
- private CheckLockRequest request;
+ private final CheckLockRequest request;
NonLeaderWatcher(Configuration conf, TableName table) {
super(conf, table);
@@ -348,8 +342,8 @@ public void afterRun() {
}
private class Heartbeater extends LeaseWatcher {
- private HeartbeatRequest req;
- private long heartbeatInterval;
+ private final HeartbeatRequest req;
+ private final long heartbeatInterval;
Heartbeater(Configuration conf, TableName table) {
super(conf, table);
@@ -389,7 +383,7 @@ public void runInternal() {
reclaim();
} catch (Exception e) {
// Wait for next cycle.
- LOG.warn("Heartbeat failed with exception: " + e.getMessage(), e);
+ LOG.warn("Heartbeat failed with exception: {}", e.getMessage(), e);
}
}
@@ -403,45 +397,9 @@ public void afterRun() {
}
}
- // For testing purpose only, lock would become timeout and then acquire it
again
- private class ReleaseAndRequireWatcher extends LeaseWatcher {
- long timeout;
- public ReleaseAndRequireWatcher(Configuration conf,
- TableName tableName) {
- super(conf, tableName);
- timeout = MetastoreConf.getTimeVar(conf,
- MetastoreConf.ConfVars.TXN_TIMEOUT, TimeUnit.MILLISECONDS) + 3000;
- setName("ReleaseAndRequireWatcher");
- }
-
- @Override
- public void beforeRun() {
- try {
- Thread.sleep(timeout);
- } catch (InterruptedException e) {
- // ignore this
- }
- }
-
- @Override
- public void runInternal() {
- shutDown();
- // The timeout lock should be cleaned,
- // sleep some time to let others take the chance to become the leader
- try {
- Thread.sleep(5000);
- } catch (InterruptedException e) {
- // ignore
- }
- // Acquire the lock again
- conf = new Configuration(conf);
- conf.setBoolean(METASTORE_RENEW_LEASE, true);
- reclaim();
- }
- }
-
@Override
public void close() {
+ stopped = true;
shutdownWatcher();
if (isLeader) {
isLeader = false;
@@ -454,15 +412,11 @@ public void close() {
} catch (NoSuchLockException | TxnOpenException e) {
// ignore
} catch (Exception e) {
- LOG.error("Error while unlocking: " + lockId, e);
+ LOG.error("Error while unlocking: {}", lockId, e);
}
}
}
- public long getLockId() {
- return lockId;
- }
-
@Override
public void setName(String name) {
this.name = name;
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
index 13f32f64630..b77dd08601e 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
@@ -388,6 +388,7 @@ LockResponse lock(LockRequest rqst)
* @throws TxnAbortedException
* @throws MetaException
*/
+ @SqlRetry(lockInternally = true)
@Transactional(POOL_TX)
@RetrySemantics.SafeToRetry
LockResponse checkLock(CheckLockRequest rqst)
@@ -425,7 +426,7 @@ void unlock(UnlockRequest rqst)
* @throws TxnAbortedException
* @throws MetaException
*/
- @SqlRetry
+ @SqlRetry(lockInternally = true)
@Transactional(POOL_TX)
@RetrySemantics.SafeToRetry
void heartbeat(HeartbeatRequest ids)
diff --git
a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/leader/TestLeaderElection.java
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/leader/TestLeaderElection.java
index 59f7dbc8fd6..c3d24cd2dce 100644
---
a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/leader/TestLeaderElection.java
+++
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/leader/TestLeaderElection.java
@@ -20,16 +20,10 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.TableName;
-import org.apache.hadoop.hive.metastore.api.UnlockRequest;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
-import org.apache.hadoop.hive.metastore.txn.TxnStore;
-import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil;
import org.junit.Test;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -40,7 +34,7 @@ public class TestLeaderElection {
@Test
public void testConfigLeaderElection() throws Exception {
- LeaderElection election = new StaticLeaderElection();
+ LeaderElection<String> election = new StaticLeaderElection();
String leaderHost = "host1.work";
Configuration configuration = MetastoreConf.newMetastoreConf();
election.tryBeLeader(configuration, leaderHost);
@@ -87,9 +81,7 @@ public void testLeaseLeaderElection() throws Exception {
MetastoreConf.setBoolVar(configuration,
MetastoreConf.ConfVars.HIVE_IN_TEST, true);
TestTxnDbUtil.setConfValues(configuration);
TestTxnDbUtil.prepDb(configuration);
- TxnStore txnStore = TxnUtils.getTxnStore(configuration);
- configuration.setBoolean(LeaseLeaderElection.METASTORE_RENEW_LEASE, false);
TableName mutex = new TableName("hive", "default", "leader_lease_ms");
LeaseLeaderElection instance1 = new LeaseLeaderElection();
AtomicBoolean flag1 = new AtomicBoolean(false);
@@ -98,7 +90,6 @@ public void testLeaseLeaderElection() throws Exception {
// elect1 as a leader now
assertTrue(flag1.get() && instance1.isLeader());
- configuration.setBoolean(LeaseLeaderElection.METASTORE_RENEW_LEASE, true);
LeaseLeaderElection instance2 = new LeaseLeaderElection();
AtomicBoolean flag2 = new AtomicBoolean(false);
instance2.addStateListener(new TestLeaderListener(flag2));
@@ -106,52 +97,14 @@ public void testLeaseLeaderElection() throws Exception {
// instance2 should not be leader as elect1 holds the lease
assertFalse(flag2.get() || instance2.isLeader());
- ExecutorService service = Executors.newFixedThreadPool(4);
- wait(service, flag1, flag2);
- // now instance1 lease is timeout, the instance2 should be leader now
+ instance1.close();
+ synchronized (flag2) {
+ flag2.wait();
+ }
+ // now instance1 lease is released, the instance2 should be leader now
assertTrue(instance2.isLeader() && flag2.get());
assertFalse(flag1.get() || instance1.isLeader());
assertTrue(flag2.get() && instance2.isLeader());
-
- // remove leader's lease (instance2)
- long lockId2 = instance2.getLockId();
- txnStore.unlock(new UnlockRequest(lockId2));
- wait(service, flag1, flag2);
- assertFalse(flag2.get() || instance2.isLeader());
- assertTrue(lockId2 > 0);
- assertFalse(instance2.getLockId() == lockId2);
-
- // remove leader's lease(instance1)
- long lockId1 = instance1.getLockId();
- txnStore.unlock(new UnlockRequest(lockId1));
- wait(service, flag1, flag2);
- assertFalse(lockId1 == instance1.getLockId());
- assertTrue(lockId1 > 0);
-
- for (int i = 0; i < 10; i++) {
- assertFalse(flag1.get() || instance1.isLeader());
- assertTrue(flag2.get() && instance2.isLeader());
- Thread.sleep(1 * 1000);
- }
- }
-
- private void wait(ExecutorService service, Object... obj) throws Exception {
- Future[] fs = new Future[obj.length];
- for (int i = 0; i < obj.length; i++) {
- Object monitor = obj[i];
- fs[i] = service.submit(() -> {
- try {
- synchronized (monitor) {
- monitor.wait();
- }
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- });
- }
- for (Future f : fs) {
- f.get();
- }
}
}