This is an automated email from the ASF dual-hosted git repository.
dengzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 1097dde68d8 HIVE-28001: Fix the flaky test TestLeaderElection (#5011)
(Zhihua Deng, reviewed by Sai Hemanth Gantasala)
1097dde68d8 is described below
commit 1097dde68d829ce3863ee667afb9c6332b7f2f0a
Author: dengzh <[email protected]>
AuthorDate: Thu Jan 18 14:12:19 2024 +0800
HIVE-28001: Fix the flaky test TestLeaderElection (#5011) (Zhihua Deng,
reviewed by Sai Hemanth Gantasala)
---
.../hadoop/hive/metastore/conf/MetastoreConf.java | 5 +-
.../metastore/leader/LeaderElectionContext.java | 25 ++++---
.../metastore/leader/LeaderElectionFactory.java | 6 +-
.../hive/metastore/leader/LeaseLeaderElection.java | 25 ++++---
.../hive/metastore/leader/TestLeaderElection.java | 78 ++++++++++++++--------
5 files changed, 92 insertions(+), 47 deletions(-)
diff --git
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
index ffa3d6a06c5..792e27fc99c 100644
---
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
+++
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
@@ -580,7 +580,7 @@ public class MetastoreConf {
"metastore.housekeeping.leader.election",
"host", new StringSetValidator("host", "lock"),
"Set to host, HMS will choose the leader by the configured
metastore.housekeeping.leader.hostname.\n" +
- "Set to lock, HMS will use the hive lock to elect the leader."),
+ "Set to lock, HMS will use the Hive lock to elect the leader."),
METASTORE_HOUSEKEEPING_LEADER_AUDITTABLE("metastore.housekeeping.leader.auditTable",
"metastore.housekeeping.leader.auditTable", "",
"Audit the leader election event to a plain json table when
configured."),
@@ -593,6 +593,9 @@ public class MetastoreConf {
"metastore.housekeeping.leader.auditFiles.limit", 10,
"Limit the number of small audit files when
metastore.housekeeping.leader.newAuditFile is true.\n" +
"If the number of audit files exceeds the limit, then the oldest will
be deleted."),
+
METASTORE_HOUSEKEEPING_LEADER_LOCK_NAMESPACE("metastore.housekeeping.leader.lock.namespace",
+ "metastore.housekeeping.leader.lock.namespace", "",
+ "The database where the Hive lock sits when
metastore.housekeeping.leader.election is set to lock."),
METASTORE_HOUSEKEEPING_THREADS_ON("metastore.housekeeping.threads.on",
"hive.metastore.housekeeping.threads.on", false,
"Whether to run the tasks under metastore.task.threads.remote on this
metastore instance or not.\n" +
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaderElectionContext.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaderElectionContext.java
index 3c01d8030ca..a3652d1c001 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaderElectionContext.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaderElectionContext.java
@@ -42,12 +42,12 @@ public class LeaderElectionContext {
* For those tasks which belong to the same type, they will be running in
the same leader.
*/
public enum TTYPE {
- HOUSEKEEPING(new TableName(Warehouse.DEFAULT_CATALOG_NAME, "sys",
- "metastore_housekeeping_leader"), "housekeeping"),
- WORKER(new TableName(Warehouse.DEFAULT_CATALOG_NAME, "sys",
- "metastore_worker_leader"), "compactor_worker"),
- ALWAYS_TASKS(new TableName(Warehouse.DEFAULT_CATALOG_NAME, "sys",
- "metastore_always_tasks_leader"), "always_tasks");
+ HOUSEKEEPING(new TableName(Warehouse.DEFAULT_CATALOG_NAME,
"__METASTORE_LEADER_ELECTION__",
+ "metastore_housekeeping"), "housekeeping"),
+ WORKER(new TableName(Warehouse.DEFAULT_CATALOG_NAME,
"__METASTORE_LEADER_ELECTION__",
+ "metastore_compactor_worker"), "compactor_worker"),
+ ALWAYS_TASKS(new TableName(Warehouse.DEFAULT_CATALOG_NAME,
"__METASTORE_LEADER_ELECTION__",
+ "metastore_always_tasks"), "always_tasks");
// Mutex of TTYPE, which can be a nonexistent table
private final TableName mutex;
// Name of TTYPE
@@ -127,9 +127,10 @@ public class LeaderElectionContext {
throw new RuntimeException("Error claiming to be leader: " +
leaderElection.getName(), e);
}
});
+ daemon.setName("Metastore Election " + leaderElection.getName());
+ daemon.setDaemon(true);
+
if (startAsDaemon) {
- daemon.setName("Leader-Election-" + leaderElection.getName());
- daemon.setDaemon(true);
daemon.start();
} else {
daemon.run();
@@ -154,7 +155,13 @@ public class LeaderElectionContext {
case "host":
return servHost;
case "lock":
- return ttype.getTableName();
+ TableName mutex = ttype.getTableName();
+ String namespace =
+ MetastoreConf.getVar(conf,
MetastoreConf.ConfVars.METASTORE_HOUSEKEEPING_LEADER_LOCK_NAMESPACE);
+ if (StringUtils.isNotEmpty(namespace)) {
+ return new TableName(mutex.getCat(), namespace, mutex.getTable());
+ }
+ return mutex;
default:
throw new UnsupportedOperationException(method + " not supported for
leader election");
}
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaderElectionFactory.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaderElectionFactory.java
index 51dce28ac50..5055ad8a003 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaderElectionFactory.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaderElectionFactory.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.hive.metastore.leader;
+import java.io.IOException;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
@@ -26,7 +28,7 @@ import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
*/
public class LeaderElectionFactory {
- public static LeaderElection create(Configuration conf) {
+ public static LeaderElection create(Configuration conf) throws IOException {
String method =
MetastoreConf.getVar(conf,
MetastoreConf.ConfVars.METASTORE_HOUSEKEEPING_LEADER_ELECTION);
switch (method.toLowerCase()) {
@@ -35,7 +37,7 @@ public class LeaderElectionFactory {
case "lock":
return new LeaseLeaderElection();
default:
- throw new UnsupportedOperationException("Do not support " + method + "
now");
+ throw new UnsupportedOperationException(method + " is not supported
for electing the leader");
}
}
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaseLeaderElection.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaseLeaderElection.java
index 937174f5d92..d6ad76dcce9 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaseLeaderElection.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaseLeaderElection.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hive.metastore.utils.SecurityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.List;
@@ -92,9 +93,17 @@ public class LeaseLeaderElection implements
LeaderElection<TableName> {
public static final String METASTORE_RENEW_LEASE =
"metastore.renew.leader.lease";
private String name;
+ private String userName;
+ private String hostName;
- private void doWork(LockResponse resp, Configuration conf,
+ public LeaseLeaderElection() throws IOException {
+ userName = SecurityUtils.getUser();
+ hostName = InetAddress.getLocalHost().getHostName();
+ }
+
+ private synchronized void doWork(LockResponse resp, Configuration conf,
TableName tableName) throws LeaderException {
+ long start = System.currentTimeMillis();
lockId = resp.getLockid();
assert resp.getState() == LockState.ACQUIRED || resp.getState() ==
LockState.WAITING;
shutdownWatcher();
@@ -121,6 +130,7 @@ public class LeaseLeaderElection implements
LeaderElection<TableName> {
default:
throw new IllegalStateException("Unexpected lock state: " +
resp.getState());
}
+ LOG.debug("Spent {}ms to notify the listeners, isLeader: {}",
System.currentTimeMillis() - start, isLeader);
}
private void notifyListener() {
@@ -142,13 +152,6 @@ public class LeaseLeaderElection implements
LeaderElection<TableName> {
public void tryBeLeader(Configuration conf, TableName table) throws
LeaderException {
requireNonNull(conf, "conf is null");
requireNonNull(table, "table is null");
- String user, hostName;
- try {
- user = SecurityUtils.getUser();
- hostName = InetAddress.getLocalHost().getHostName();
- } catch (Exception e) {
- throw new LeaderException("Error while getting the username", e);
- }
if (store == null) {
store = TxnUtils.getTxnStore(conf);
@@ -165,7 +168,7 @@ public class LeaseLeaderElection implements
LeaderElection<TableName> {
boolean lockable = false;
Exception recentException = null;
long start = System.currentTimeMillis();
- LockRequest req = new LockRequest(components, user, hostName);
+ LockRequest req = new LockRequest(components, userName, hostName);
int numRetries = MetastoreConf.getIntVar(conf,
MetastoreConf.ConfVars.LOCK_NUMRETRIES);
long maxSleep = MetastoreConf.getTimeVar(conf,
MetastoreConf.ConfVars.LOCK_SLEEP_BETWEEN_RETRIES,
TimeUnit.MILLISECONDS);
@@ -175,6 +178,7 @@ public class LeaseLeaderElection implements
LeaderElection<TableName> {
if (res.getState() == LockState.WAITING || res.getState() ==
LockState.ACQUIRED) {
lockable = true;
doWork(res, conf, table);
+ LOG.debug("Spent {}ms to lock the table {}, retries: {}",
System.currentTimeMillis() - start, table, i);
break;
}
} catch (NoSuchTxnException | TxnAbortedException e) {
@@ -324,6 +328,7 @@ public class LeaseLeaderElection implements
LeaderElection<TableName> {
} catch (NoSuchTxnException | TxnAbortedException e) {
throw new AssertionError("This should not happen, we didn't open txn",
e);
} catch (NoSuchLockException e) {
+ LOG.info("No such lock {} for NonLeaderWatcher, try to obtain the lock
again...", lockId);
reclaim();
} catch (Exception e) {
// Wait for next cycle.
@@ -379,6 +384,7 @@ public class LeaseLeaderElection implements
LeaderElection<TableName> {
} catch (NoSuchTxnException | TxnAbortedException e) {
throw new AssertionError("This should not happen, we didn't open txn",
e);
} catch (NoSuchLockException e) {
+ LOG.info("No such lock {} for Heartbeater, try to obtain the lock
again...", lockId);
reclaim();
} catch (Exception e) {
// Wait for next cycle.
@@ -404,6 +410,7 @@ public class LeaseLeaderElection implements
LeaderElection<TableName> {
super(conf, tableName);
timeout = MetastoreConf.getTimeVar(conf,
MetastoreConf.ConfVars.TXN_TIMEOUT, TimeUnit.MILLISECONDS) + 3000;
+ setName("ReleaseAndRequireWatcher");
}
@Override
diff --git
a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/leader/TestLeaderElection.java
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/leader/TestLeaderElection.java
index 99d13c00b4a..59f7dbc8fd6 100644
---
a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/leader/TestLeaderElection.java
+++
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/leader/TestLeaderElection.java
@@ -27,6 +27,9 @@ import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil;
import org.junit.Test;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -52,6 +55,28 @@ public class TestLeaderElection {
assertFalse(election.isLeader());
}
+ static class TestLeaderListener implements
LeaderElection.LeadershipStateListener {
+ AtomicBoolean flag;
+ TestLeaderListener(AtomicBoolean flag) {
+ this.flag = flag;
+ }
+ @Override
+ public void takeLeadership(LeaderElection election) throws Exception {
+ synchronized (flag) {
+ flag.set(true);
+ flag.notifyAll();
+ }
+ }
+
+ @Override
+ public void lossLeadership(LeaderElection election) throws Exception {
+ synchronized (flag) {
+ flag.set(false);
+ flag.notifyAll();
+ }
+ }
+ }
+
@Test
public void testLeaseLeaderElection() throws Exception {
Configuration configuration = MetastoreConf.newMetastoreConf();
@@ -68,16 +93,7 @@ public class TestLeaderElection {
TableName mutex = new TableName("hive", "default", "leader_lease_ms");
LeaseLeaderElection instance1 = new LeaseLeaderElection();
AtomicBoolean flag1 = new AtomicBoolean(false);
- instance1.addStateListener(new LeaderElection.LeadershipStateListener() {
- @Override
- public void takeLeadership(LeaderElection election) {
- flag1.set(true);
- }
- @Override
- public void lossLeadership(LeaderElection election) {
- flag1.set(false);
- }
- });
+ instance1.addStateListener(new TestLeaderListener(flag1));
instance1.tryBeLeader(configuration, mutex);
// elect1 as a leader now
assertTrue(flag1.get() && instance1.isLeader());
@@ -85,31 +101,22 @@ public class TestLeaderElection {
configuration.setBoolean(LeaseLeaderElection.METASTORE_RENEW_LEASE, true);
LeaseLeaderElection instance2 = new LeaseLeaderElection();
AtomicBoolean flag2 = new AtomicBoolean(false);
- instance2.addStateListener(new LeaderElection.LeadershipStateListener() {
- @Override
- public void takeLeadership(LeaderElection election) {
- flag2.set(true);
- }
- @Override
- public void lossLeadership(LeaderElection election) {
- flag2.set(false);
- }
- });
+ instance2.addStateListener(new TestLeaderListener(flag2));
instance2.tryBeLeader(configuration, mutex);
-
// instance2 should not be leader as elect1 holds the lease
assertFalse(flag2.get() || instance2.isLeader());
- Thread.sleep(15 * 1000);
+
+ ExecutorService service = Executors.newFixedThreadPool(4);
+ wait(service, flag1, flag2);
// now instance1 lease is timeout, the instance2 should be leader now
assertTrue(instance2.isLeader() && flag2.get());
-
assertFalse(flag1.get() || instance1.isLeader());
assertTrue(flag2.get() && instance2.isLeader());
+
// remove leader's lease (instance2)
long lockId2 = instance2.getLockId();
txnStore.unlock(new UnlockRequest(lockId2));
- Thread.sleep(4 * 1000);
- assertTrue(flag1.get() && instance1.isLeader());
+ wait(service, flag1, flag2);
assertFalse(flag2.get() || instance2.isLeader());
assertTrue(lockId2 > 0);
assertFalse(instance2.getLockId() == lockId2);
@@ -117,7 +124,7 @@ public class TestLeaderElection {
// remove leader's lease(instance1)
long lockId1 = instance1.getLockId();
txnStore.unlock(new UnlockRequest(lockId1));
- Thread.sleep(4 * 1000);
+ wait(service, flag1, flag2);
assertFalse(lockId1 == instance1.getLockId());
assertTrue(lockId1 > 0);
@@ -128,4 +135,23 @@ public class TestLeaderElection {
}
}
+ private void wait(ExecutorService service, Object... obj) throws Exception {
+ Future[] fs = new Future[obj.length];
+ for (int i = 0; i < obj.length; i++) {
+ Object monitor = obj[i];
+ fs[i] = service.submit(() -> {
+ try {
+ synchronized (monitor) {
+ monitor.wait();
+ }
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+ for (Future f : fs) {
+ f.get();
+ }
+ }
+
}