This is an automated email from the ASF dual-hosted git repository.

dengzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 1097dde68d8 HIVE-28001: Fix the flaky test TestLeaderElection (#5011) 
(Zhihua Deng, reviewed by Sai Hemanth Gantasala)
1097dde68d8 is described below

commit 1097dde68d829ce3863ee667afb9c6332b7f2f0a
Author: dengzh <[email protected]>
AuthorDate: Thu Jan 18 14:12:19 2024 +0800

    HIVE-28001: Fix the flaky test TestLeaderElection (#5011) (Zhihua Deng, 
reviewed by Sai Hemanth Gantasala)
---
 .../hadoop/hive/metastore/conf/MetastoreConf.java  |  5 +-
 .../metastore/leader/LeaderElectionContext.java    | 25 ++++---
 .../metastore/leader/LeaderElectionFactory.java    |  6 +-
 .../hive/metastore/leader/LeaseLeaderElection.java | 25 ++++---
 .../hive/metastore/leader/TestLeaderElection.java  | 78 ++++++++++++++--------
 5 files changed, 92 insertions(+), 47 deletions(-)

diff --git 
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
 
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
index ffa3d6a06c5..792e27fc99c 100644
--- 
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
+++ 
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
@@ -580,7 +580,7 @@ public class MetastoreConf {
         "metastore.housekeeping.leader.election",
         "host", new StringSetValidator("host", "lock"),
         "Set to host, HMS will choose the leader by the configured 
metastore.housekeeping.leader.hostname.\n" +
-        "Set to lock, HMS will use the hive lock to elect the leader."),
+        "Set to lock, HMS will use the Hive lock to elect the leader."),
     
METASTORE_HOUSEKEEPING_LEADER_AUDITTABLE("metastore.housekeeping.leader.auditTable",
         "metastore.housekeeping.leader.auditTable", "",
         "Audit the leader election event to a plain json table when 
configured."),
@@ -593,6 +593,9 @@ public class MetastoreConf {
         "metastore.housekeeping.leader.auditFiles.limit", 10,
         "Limit the number of small audit files when 
metastore.housekeeping.leader.newAuditFile is true.\n" +
         "If the number of audit files exceeds the limit, then the oldest will 
be deleted."),
+    
METASTORE_HOUSEKEEPING_LEADER_LOCK_NAMESPACE("metastore.housekeeping.leader.lock.namespace",
+        "metastore.housekeeping.leader.lock.namespace", "",
+        "The database where the Hive lock sits when 
metastore.housekeeping.leader.election is set to lock."),
     METASTORE_HOUSEKEEPING_THREADS_ON("metastore.housekeeping.threads.on",
         "hive.metastore.housekeeping.threads.on", false,
         "Whether to run the tasks under metastore.task.threads.remote on this 
metastore instance or not.\n" +
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaderElectionContext.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaderElectionContext.java
index 3c01d8030ca..a3652d1c001 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaderElectionContext.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaderElectionContext.java
@@ -42,12 +42,12 @@ public class LeaderElectionContext {
    * For those tasks which belong to the same type, they will be running in 
the same leader.
    */
   public enum TTYPE {
-    HOUSEKEEPING(new TableName(Warehouse.DEFAULT_CATALOG_NAME, "sys",
-        "metastore_housekeeping_leader"), "housekeeping"),
-    WORKER(new TableName(Warehouse.DEFAULT_CATALOG_NAME, "sys",
-        "metastore_worker_leader"), "compactor_worker"),
-    ALWAYS_TASKS(new TableName(Warehouse.DEFAULT_CATALOG_NAME, "sys",
-        "metastore_always_tasks_leader"), "always_tasks");
+    HOUSEKEEPING(new TableName(Warehouse.DEFAULT_CATALOG_NAME, 
"__METASTORE_LEADER_ELECTION__",
+        "metastore_housekeeping"), "housekeeping"),
+    WORKER(new TableName(Warehouse.DEFAULT_CATALOG_NAME, 
"__METASTORE_LEADER_ELECTION__",
+        "metastore_compactor_worker"), "compactor_worker"),
+    ALWAYS_TASKS(new TableName(Warehouse.DEFAULT_CATALOG_NAME, 
"__METASTORE_LEADER_ELECTION__",
+        "metastore_always_tasks"), "always_tasks");
     // Mutex of TTYPE, which can be a nonexistent table
     private final TableName mutex;
     // Name of TTYPE
@@ -127,9 +127,10 @@ public class LeaderElectionContext {
           throw new RuntimeException("Error claiming to be leader: " + 
leaderElection.getName(), e);
         }
       });
+      daemon.setName("Metastore Election " + leaderElection.getName());
+      daemon.setDaemon(true);
+
       if (startAsDaemon) {
-        daemon.setName("Leader-Election-" + leaderElection.getName());
-        daemon.setDaemon(true);
         daemon.start();
       } else {
         daemon.run();
@@ -154,7 +155,13 @@ public class LeaderElectionContext {
     case "host":
       return servHost;
     case "lock":
-      return ttype.getTableName();
+      TableName mutex = ttype.getTableName();
+      String namespace =
+          MetastoreConf.getVar(conf, 
MetastoreConf.ConfVars.METASTORE_HOUSEKEEPING_LEADER_LOCK_NAMESPACE);
+      if (StringUtils.isNotEmpty(namespace)) {
+        return new TableName(mutex.getCat(), namespace, mutex.getTable());
+      }
+      return mutex;
     default:
       throw new UnsupportedOperationException(method + " not supported for 
leader election");
     }
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaderElectionFactory.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaderElectionFactory.java
index 51dce28ac50..5055ad8a003 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaderElectionFactory.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaderElectionFactory.java
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.hive.metastore.leader;
 
+import java.io.IOException;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 
@@ -26,7 +28,7 @@ import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
  */
 public class LeaderElectionFactory {
 
-  public static LeaderElection create(Configuration conf) {
+  public static LeaderElection create(Configuration conf) throws IOException  {
     String method =
         MetastoreConf.getVar(conf, 
MetastoreConf.ConfVars.METASTORE_HOUSEKEEPING_LEADER_ELECTION);
     switch (method.toLowerCase()) {
@@ -35,7 +37,7 @@ public class LeaderElectionFactory {
       case "lock":
         return new LeaseLeaderElection();
       default:
-        throw new UnsupportedOperationException("Do not support " + method + " 
now");
+        throw new UnsupportedOperationException(method + " is not supported 
for electing the leader");
     }
   }
 
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaseLeaderElection.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaseLeaderElection.java
index 937174f5d92..d6ad76dcce9 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaseLeaderElection.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaseLeaderElection.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hive.metastore.utils.SecurityUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.List;
@@ -92,9 +93,17 @@ public class LeaseLeaderElection implements 
LeaderElection<TableName> {
   public static final String METASTORE_RENEW_LEASE = 
"metastore.renew.leader.lease";
 
   private String name;
+  private String userName;
+  private String hostName;
 
-  private void doWork(LockResponse resp, Configuration conf,
+  public LeaseLeaderElection() throws IOException {
+    userName = SecurityUtils.getUser();
+    hostName = InetAddress.getLocalHost().getHostName();
+  }
+
+  private synchronized void doWork(LockResponse resp, Configuration conf,
       TableName tableName) throws LeaderException {
+    long start = System.currentTimeMillis();
     lockId = resp.getLockid();
     assert resp.getState() == LockState.ACQUIRED || resp.getState() == 
LockState.WAITING;
     shutdownWatcher();
@@ -121,6 +130,7 @@ public class LeaseLeaderElection implements 
LeaderElection<TableName> {
     default:
       throw new IllegalStateException("Unexpected lock state: " + 
resp.getState());
     }
+    LOG.debug("Spent {}ms to notify the listeners, isLeader: {}", 
System.currentTimeMillis() - start, isLeader);
   }
 
   private void notifyListener() {
@@ -142,13 +152,6 @@ public class LeaseLeaderElection implements 
LeaderElection<TableName> {
   public void tryBeLeader(Configuration conf, TableName table) throws 
LeaderException {
     requireNonNull(conf, "conf is null");
     requireNonNull(table, "table is null");
-    String user, hostName;
-    try {
-      user = SecurityUtils.getUser();
-      hostName = InetAddress.getLocalHost().getHostName();
-    } catch (Exception e) {
-      throw new LeaderException("Error while getting the username", e);
-    }
 
     if (store == null) {
       store = TxnUtils.getTxnStore(conf);
@@ -165,7 +168,7 @@ public class LeaseLeaderElection implements 
LeaderElection<TableName> {
     boolean lockable = false;
     Exception recentException = null;
     long start = System.currentTimeMillis();
-    LockRequest req = new LockRequest(components, user, hostName);
+    LockRequest req = new LockRequest(components, userName, hostName);
     int numRetries = MetastoreConf.getIntVar(conf, 
MetastoreConf.ConfVars.LOCK_NUMRETRIES);
     long maxSleep = MetastoreConf.getTimeVar(conf,
         MetastoreConf.ConfVars.LOCK_SLEEP_BETWEEN_RETRIES, 
TimeUnit.MILLISECONDS);
@@ -175,6 +178,7 @@ public class LeaseLeaderElection implements 
LeaderElection<TableName> {
         if (res.getState() == LockState.WAITING || res.getState() == 
LockState.ACQUIRED) {
           lockable = true;
           doWork(res, conf, table);
+          LOG.debug("Spent {}ms to lock the table {}, retries: {}", 
System.currentTimeMillis() - start, table, i);
           break;
         }
       } catch (NoSuchTxnException | TxnAbortedException e) {
@@ -324,6 +328,7 @@ public class LeaseLeaderElection implements 
LeaderElection<TableName> {
       } catch (NoSuchTxnException | TxnAbortedException e) {
         throw new AssertionError("This should not happen, we didn't open txn", 
e);
       } catch (NoSuchLockException e) {
+        LOG.info("No such lock {} for NonLeaderWatcher, try to obtain the lock 
again...", lockId);
         reclaim();
       } catch (Exception e) {
         // Wait for next cycle.
@@ -379,6 +384,7 @@ public class LeaseLeaderElection implements 
LeaderElection<TableName> {
       } catch (NoSuchTxnException | TxnAbortedException e) {
         throw new AssertionError("This should not happen, we didn't open txn", 
e);
       } catch (NoSuchLockException e) {
+        LOG.info("No such lock {} for Heartbeater, try to obtain the lock 
again...", lockId);
         reclaim();
       } catch (Exception e) {
         // Wait for next cycle.
@@ -404,6 +410,7 @@ public class LeaseLeaderElection implements 
LeaderElection<TableName> {
       super(conf, tableName);
       timeout = MetastoreConf.getTimeVar(conf,
           MetastoreConf.ConfVars.TXN_TIMEOUT, TimeUnit.MILLISECONDS) + 3000;
+      setName("ReleaseAndRequireWatcher");
     }
 
     @Override
diff --git 
a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/leader/TestLeaderElection.java
 
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/leader/TestLeaderElection.java
index 99d13c00b4a..59f7dbc8fd6 100644
--- 
a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/leader/TestLeaderElection.java
+++ 
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/leader/TestLeaderElection.java
@@ -27,6 +27,9 @@ import org.apache.hadoop.hive.metastore.txn.TxnUtils;
 import org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil;
 import org.junit.Test;
 
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -52,6 +55,28 @@ public class TestLeaderElection {
     assertFalse(election.isLeader());
   }
 
+  static class TestLeaderListener implements 
LeaderElection.LeadershipStateListener {
+    AtomicBoolean flag;
+    TestLeaderListener(AtomicBoolean flag) {
+      this.flag = flag;
+    }
+    @Override
+    public void takeLeadership(LeaderElection election) throws Exception {
+      synchronized (flag) {
+        flag.set(true);
+        flag.notifyAll();
+      }
+    }
+
+    @Override
+    public void lossLeadership(LeaderElection election) throws Exception {
+      synchronized (flag) {
+        flag.set(false);
+        flag.notifyAll();
+      }
+    }
+  }
+
   @Test
   public void testLeaseLeaderElection() throws Exception {
     Configuration configuration = MetastoreConf.newMetastoreConf();
@@ -68,16 +93,7 @@ public class TestLeaderElection {
     TableName mutex = new TableName("hive", "default", "leader_lease_ms");
     LeaseLeaderElection instance1 = new LeaseLeaderElection();
     AtomicBoolean flag1 = new AtomicBoolean(false);
-    instance1.addStateListener(new LeaderElection.LeadershipStateListener() {
-      @Override
-      public void takeLeadership(LeaderElection election) {
-        flag1.set(true);
-      }
-      @Override
-      public void lossLeadership(LeaderElection election) {
-        flag1.set(false);
-      }
-    });
+    instance1.addStateListener(new TestLeaderListener(flag1));
     instance1.tryBeLeader(configuration, mutex);
     // elect1 as a leader now
     assertTrue(flag1.get() && instance1.isLeader());
@@ -85,31 +101,22 @@ public class TestLeaderElection {
     configuration.setBoolean(LeaseLeaderElection.METASTORE_RENEW_LEASE, true);
     LeaseLeaderElection instance2 = new LeaseLeaderElection();
     AtomicBoolean flag2 = new AtomicBoolean(false);
-    instance2.addStateListener(new LeaderElection.LeadershipStateListener() {
-      @Override
-      public void takeLeadership(LeaderElection election) {
-        flag2.set(true);
-      }
-      @Override
-      public void lossLeadership(LeaderElection election) {
-        flag2.set(false);
-      }
-    });
+    instance2.addStateListener(new TestLeaderListener(flag2));
     instance2.tryBeLeader(configuration, mutex);
-
     // instance2 should not be leader as elect1 holds the lease
     assertFalse(flag2.get() || instance2.isLeader());
-    Thread.sleep(15 * 1000);
+
+    ExecutorService service = Executors.newFixedThreadPool(4);
+    wait(service, flag1, flag2);
     // now instance1 lease is timeout, the instance2 should be leader now
     assertTrue(instance2.isLeader() && flag2.get());
-
     assertFalse(flag1.get() || instance1.isLeader());
     assertTrue(flag2.get() && instance2.isLeader());
+
     // remove leader's lease (instance2)
     long lockId2 = instance2.getLockId();
     txnStore.unlock(new UnlockRequest(lockId2));
-    Thread.sleep(4 * 1000);
-    assertTrue(flag1.get() && instance1.isLeader());
+    wait(service, flag1, flag2);
     assertFalse(flag2.get() || instance2.isLeader());
     assertTrue(lockId2 > 0);
     assertFalse(instance2.getLockId() == lockId2);
@@ -117,7 +124,7 @@ public class TestLeaderElection {
     // remove leader's lease(instance1)
     long lockId1 = instance1.getLockId();
     txnStore.unlock(new UnlockRequest(lockId1));
-    Thread.sleep(4 * 1000);
+    wait(service, flag1, flag2);
     assertFalse(lockId1 == instance1.getLockId());
     assertTrue(lockId1 > 0);
 
@@ -128,4 +135,23 @@ public class TestLeaderElection {
     }
   }
 
+  private void wait(ExecutorService service, Object... obj) throws Exception {
+    Future[] fs = new Future[obj.length];
+    for (int i = 0; i < obj.length; i++) {
+      Object monitor = obj[i];
+      fs[i] = service.submit(() -> {
+        try {
+          synchronized (monitor) {
+            monitor.wait();
+          }
+        } catch (InterruptedException e) {
+          throw new RuntimeException(e);
+        }
+      });
+    }
+    for (Future f : fs) {
+      f.get();
+    }
+  }
+
 }

Reply via email to