This is an automated email from the ASF dual-hosted git repository.

dengzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 8dbc351afca HIVE-27692: Explore removing the maintenance task from the 
embedded Metastore (#4782) (Zhihua Deng, reviewed by Naveen Gangam)
8dbc351afca is described below

commit 8dbc351afcaba38ee498f17825dad7a75a9b4524
Author: dengzh <[email protected]>
AuthorDate: Fri Feb 23 17:06:57 2024 +0800

    HIVE-27692: Explore removing the maintenance task from the embedded 
Metastore (#4782) (Zhihua Deng, reviewed by Naveen Gangam)
---
 .../ql/txn/compactor/TestDeltaFilesMetrics.java    | 36 ++++++++++++++--------
 .../apache/hadoop/hive/metastore/HMSHandler.java   | 27 ----------------
 .../hadoop/hive/metastore/TestMarkPartition.java   |  6 +++-
 3 files changed, 28 insertions(+), 41 deletions(-)

diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestDeltaFilesMetrics.java
 
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestDeltaFilesMetrics.java
index 2eb6185be69..8e332fbe455 100644
--- 
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestDeltaFilesMetrics.java
+++ 
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestDeltaFilesMetrics.java
@@ -35,6 +35,7 @@ import 
org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
 import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.metrics.AcidMetricService;
 import org.apache.hadoop.hive.metastore.metrics.Metrics;
 import org.apache.hadoop.hive.metastore.metrics.MetricsConstants;
 import org.junit.After;
@@ -52,6 +53,8 @@ import static org.hamcrest.MatcherAssert.assertThat;
 
 public class TestDeltaFilesMetrics extends CompactorTest  {
 
+  private static AcidMetricService metricService;
+
   private void setUpHiveConf() {
     MetastoreConf.setLongVar(conf, 
MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_DELTA_NUM_THRESHOLD, 1);
     MetastoreConf.setLongVar(conf, 
MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_OBSOLETE_DELTA_NUM_THRESHOLD, 1);
@@ -64,11 +67,15 @@ public class TestDeltaFilesMetrics extends CompactorTest  {
 
   @Override
   @Before
-  public void setup() throws Exception {
+  public synchronized void setup() throws Exception {
     this.conf = new HiveConf();
     setUpHiveConf();
     setup(conf);
     MetricsFactory.init(conf);
+    if (metricService == null) {
+      metricService = new AcidMetricService();
+      metricService.setConf(conf);
+    }
   }
 
   @After
@@ -127,7 +134,7 @@ public class TestDeltaFilesMetrics extends CompactorTest  {
 
     startInitiator();
 
-    TimeUnit.SECONDS.sleep(2);
+    metricService.run();
     // 2 active deltas
     // 1 small delta
     // 0 obsolete deltas
@@ -143,7 +150,7 @@ public class TestDeltaFilesMetrics extends CompactorTest  {
 
     startWorker();
 
-    TimeUnit.SECONDS.sleep(2);
+    metricService.run();
     // 0 active deltas
     // 0 small delta
     // 2 obsolete deltas
@@ -177,7 +184,7 @@ public class TestDeltaFilesMetrics extends CompactorTest  {
     HiveConf.setIntVar(conf, 
HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD, 2);
     startInitiator();
 
-    TimeUnit.SECONDS.sleep(2);
+    metricService.run();
     // 3 active deltas
     // 2 small deltas
     // 2 obsolete deltas
@@ -193,7 +200,7 @@ public class TestDeltaFilesMetrics extends CompactorTest  {
 
     startCleaner();
 
-    TimeUnit.SECONDS.sleep(2);
+    metricService.run();
     // 3 active deltas
     // 2 small deltas
     // 0 obsolete delta
@@ -209,7 +216,7 @@ public class TestDeltaFilesMetrics extends CompactorTest  {
 
     startWorker();
 
-    TimeUnit.SECONDS.sleep(2);
+    metricService.run();
     // 1 active delta
     // 0 small delta
     // 3 obsolete deltas
@@ -225,7 +232,7 @@ public class TestDeltaFilesMetrics extends CompactorTest  {
 
     startCleaner();
 
-    TimeUnit.SECONDS.sleep(2);
+    metricService.run();
     verifyDeltaMetricsMatch(
         ImmutableMap.of(dbName + "." + tblName + Path.SEPARATOR + partName, 1),
         MetricsConstants.COMPACTION_NUM_DELTAS);
@@ -235,6 +242,7 @@ public class TestDeltaFilesMetrics extends CompactorTest  {
     verifyDeltaMetricsMatch(
         ImmutableMap.of(),
         MetricsConstants.COMPACTION_NUM_OBSOLETE_DELTAS);
+    ms.dropTable(dbName, tblName);
   }
 
   @Test
@@ -283,7 +291,7 @@ public class TestDeltaFilesMetrics extends CompactorTest  {
     HiveConf.setFloatVar(conf, 
HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_PCT_THRESHOLD, 0.4f);
     startInitiator();
 
-    TimeUnit.SECONDS.sleep(2);
+    metricService.run();
     verifyDeltaMetricsMatch(
         ImmutableMap.of(
             dbName + "." + tblName + Path.SEPARATOR + part1Name, 2,
@@ -306,7 +314,7 @@ public class TestDeltaFilesMetrics extends CompactorTest  {
     startWorker();
     startWorker();
 
-    TimeUnit.SECONDS.sleep(2);
+    metricService.run();
     verifyDeltaMetricsMatch(
         ImmutableMap.of(
             dbName + "." + tblName + Path.SEPARATOR + part1Name, 2,
@@ -324,7 +332,7 @@ public class TestDeltaFilesMetrics extends CompactorTest  {
     startCleaner();
     startCleaner();
 
-    TimeUnit.SECONDS.sleep(2);
+    metricService.run();
     verifyDeltaMetricsMatch(
         ImmutableMap.of(
           dbName + "." + tblName + Path.SEPARATOR + part1Name, 2,
@@ -336,6 +344,7 @@ public class TestDeltaFilesMetrics extends CompactorTest  {
     verifyDeltaMetricsMatch(
         ImmutableMap.of(),
         MetricsConstants.COMPACTION_NUM_OBSOLETE_DELTAS);
+    ms.dropTable(dbName, tblName);
   }
 
   @Test
@@ -364,7 +373,7 @@ public class TestDeltaFilesMetrics extends CompactorTest  {
 
     startInitiator();
 
-    TimeUnit.SECONDS.sleep(2);
+    metricService.run();
     // 2 active deltas
     // 1 small delta
     // 0 obsolete deltas
@@ -380,7 +389,7 @@ public class TestDeltaFilesMetrics extends CompactorTest  {
 
     startWorker();
 
-    TimeUnit.SECONDS.sleep(2);
+    metricService.run();
     // 0 active delta
     // 0 small delta
     // 2 obsolete delta
@@ -396,7 +405,7 @@ public class TestDeltaFilesMetrics extends CompactorTest  {
 
     startCleaner();
 
-    TimeUnit.SECONDS.sleep(2);
+    metricService.run();
     // 0 active delta
     // 0 small delta
     // 0 obsolete delta
@@ -409,6 +418,7 @@ public class TestDeltaFilesMetrics extends CompactorTest  {
     verifyDeltaMetricsMatch(
         ImmutableMap.of(),
         MetricsConstants.COMPACTION_NUM_OBSOLETE_DELTAS);
+    ms.dropTable(dbName, tblName);
   }
 
   private LockComponent createLockComponent(String dbName, String tblName, 
String partName) {
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java
index 3573ed23a7a..22eefbc1ad5 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java
@@ -48,8 +48,6 @@ import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
 import 
org.apache.hadoop.hive.metastore.dataconnector.DataConnectorProviderFactory;
 import org.apache.hadoop.hive.metastore.events.*;
-import org.apache.hadoop.hive.metastore.leader.HouseKeepingTasks;
-import org.apache.hadoop.hive.metastore.leader.LeaderElectionContext;
 import org.apache.hadoop.hive.metastore.messaging.EventMessage;
 import org.apache.hadoop.hive.metastore.messaging.EventMessage.EventType;
 import org.apache.hadoop.hive.metastore.metrics.Metrics;
@@ -131,11 +129,6 @@ public class HMSHandler extends FacebookBase implements 
IHMSHandler {
   private final Configuration conf; // stores datastore (jpox) properties,
                                    // right now they come from jpox.properties
 
-  // Flag to control that always threads are initialized only once
-  // instead of multiple times
-  private final static AtomicBoolean alwaysThreadsInitialized =
-      new AtomicBoolean(false);
-
   private static String currentUrl;
   private FileMetadataManager fileMetadataManager;
   private PartitionExpressionProxy expressionProxy;
@@ -391,12 +384,6 @@ public class HMSHandler extends FacebookBase implements 
IHMSHandler {
       partitionValidationPattern = Pattern.compile(partitionValidationRegex);
     }
 
-    // We only initialize once the tasks that need to be run periodically. For 
remote metastore
-    // these threads are started along with the other housekeeping threads 
only in the leader
-    // HMS.
-    if (!HiveMetaStore.isMetaStoreRemote()) {
-      startAlwaysTaskThreads(conf, this);
-    }
     expressionProxy = PartFilterExprUtil.createExpressionProxy(conf);
     fileMetadataManager = new FileMetadataManager(this.getMS(), conf);
 
@@ -416,20 +403,6 @@ public class HMSHandler extends FacebookBase implements 
IHMSHandler {
     dataconnectorFactory = DataConnectorProviderFactory.getInstance(this);
   }
 
-  static void startAlwaysTaskThreads(Configuration conf, IHMSHandler handler) 
throws MetaException {
-    if (alwaysThreadsInitialized.compareAndSet(false, true)) {
-      try {
-        LeaderElectionContext context = new 
LeaderElectionContext.ContextBuilder(conf)
-            .setTType(LeaderElectionContext.TTYPE.ALWAYS_TASKS)
-            .addListener(new HouseKeepingTasks(conf, false))
-            .setHMSHandler(handler).build();
-        context.start();
-      } catch (Exception e) {
-        throw newMetaException(e);
-      }
-    }
-  }
-
   /**
    *
    * Filter is actually enabled only when the configured filter hook is 
configured, not default, and
diff --git 
a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestMarkPartition.java
 
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestMarkPartition.java
index 811932f23ba..8e44dc3a46e 100644
--- 
a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestMarkPartition.java
+++ 
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestMarkPartition.java
@@ -33,6 +33,7 @@ import 
org.apache.hadoop.hive.metastore.client.builder.DatabaseBuilder;
 import org.apache.hadoop.hive.metastore.client.builder.PartitionBuilder;
 import org.apache.hadoop.hive.metastore.client.builder.TableBuilder;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.events.EventCleanerTask;
 import org.apache.thrift.TException;
 import org.junit.Assert;
 import org.junit.Before;
@@ -57,6 +58,8 @@ public class TestMarkPartition {
 
   @Test
   public void testMarkingPartitionSet() throws TException, 
InterruptedException {
+    EventCleanerTask cleanerTask = new EventCleanerTask();
+    cleanerTask.setConf(conf);
     HiveMetaStoreClient msc = new HiveMetaStoreClient(conf);
 
     final String dbName = "hive2215";
@@ -83,7 +86,8 @@ public class TestMarkPartition {
     kvs.put("b", "'2011'");
     msc.markPartitionForEvent(dbName, tableName, kvs, 
PartitionEventType.LOAD_DONE);
     Assert.assertTrue(msc.isPartitionMarkedForEvent(dbName, tableName, kvs, 
PartitionEventType.LOAD_DONE));
-    Thread.sleep(10000);
+    Thread.sleep(3000);
+    cleanerTask.run();
     Assert.assertFalse(msc.isPartitionMarkedForEvent(dbName, tableName, kvs, 
PartitionEventType.LOAD_DONE));
 
     kvs.put("b", "'2012'");

Reply via email to