This is an automated email from the ASF dual-hosted git repository.

dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new e4f60bf8ac1 HIVE-27022: Extract compaction-related functionality from 
AcidHouseKeeper into a separate service (Taraka Rama Rao Lethavadla, reviewed 
by Denys Kuzmenko, Zsolt Miskolczi)
e4f60bf8ac1 is described below

commit e4f60bf8ac197224062751141211df72d79827a9
Author: tarak271 <ta...@cloudera.com>
AuthorDate: Mon Feb 12 22:00:47 2024 +0530

    HIVE-27022: Extract compaction-related functionality from AcidHouseKeeper 
into a separate service (Taraka Rama Rao Lethavadla, reviewed by Denys 
Kuzmenko, Zsolt Miskolczi)
    
    Closes #4970
---
 .../parse/TestTimedOutTxnNotificationLogging.java  |  4 +-
 .../org/apache/hadoop/hive/ql/TestTxnCommands.java |  2 +-
 .../apache/hadoop/hive/ql/TestTxnCommands2.java    | 15 ++++++-
 .../hadoop/hive/ql/lockmgr/TestDbTxnManager.java   |  2 +-
 .../hadoop/hive/ql/lockmgr/TestDbTxnManager2.java  |  7 ++-
 .../hadoop/hive/metastore/conf/MetastoreConf.java  | 17 +++++---
 .../hive/metastore/leader/HouseKeepingTasks.java   |  7 +++
 .../hadoop/hive/metastore/txn/TxnHandler.java      |  1 +
 .../txn/{ => service}/AcidHouseKeeperService.java  | 50 ++++++++++++---------
 .../{ => service}/AcidOpenTxnsCounterService.java  |  8 ++--
 .../txn/{ => service}/AcidTxnCleanerService.java   |  8 ++--
 .../txn/service/CompactionHouseKeeperService.java  | 51 ++++++++++++++++++++++
 .../hive/metastore/conf/TestMetastoreConf.java     | 11 +++--
 .../metastore/txn/TestAcidTxnCleanerService.java   |  1 +
 .../org/apache/hive/streaming/TestStreaming.java   |  2 +-
 15 files changed, 143 insertions(+), 43 deletions(-)

diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTimedOutTxnNotificationLogging.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTimedOutTxnNotificationLogging.java
index 559699cf3c3..130e4908b3c 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTimedOutTxnNotificationLogging.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTimedOutTxnNotificationLogging.java
@@ -34,8 +34,8 @@ import 
org.apache.hadoop.hive.metastore.messaging.event.filters.AndFilter;
 import org.apache.hadoop.hive.metastore.messaging.event.filters.CatalogFilter;
 import 
org.apache.hadoop.hive.metastore.messaging.event.filters.EventBoundaryFilter;
 import 
org.apache.hadoop.hive.metastore.messaging.event.filters.ReplEventFilter;
-import org.apache.hadoop.hive.metastore.txn.AcidHouseKeeperService;
-import org.apache.hadoop.hive.metastore.txn.AcidTxnCleanerService;
+import org.apache.hadoop.hive.metastore.txn.service.AcidHouseKeeperService;
+import org.apache.hadoop.hive.metastore.txn.service.AcidTxnCleanerService;
 import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
 import org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil;
 import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java 
b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
index 46c8e824456..39e09a8eb17 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
@@ -66,7 +66,7 @@ import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
 import org.apache.hadoop.hive.metastore.api.TxnInfo;
 import org.apache.hadoop.hive.metastore.api.TxnState;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
-import org.apache.hadoop.hive.metastore.txn.AcidHouseKeeperService;
+import org.apache.hadoop.hive.metastore.txn.service.AcidHouseKeeperService;
 import org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil;
 import org.apache.hadoop.hive.metastore.txn.TxnStore;
 import org.apache.hadoop.hive.metastore.txn.TxnUtils;
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java 
b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index 9b2edfa10f5..3f574e384ed 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -63,7 +63,8 @@ import 
org.apache.hadoop.hive.metastore.api.AbortCompactResponse;
 import org.apache.hadoop.hive.metastore.api.AbortCompactionRequest;
 import org.apache.hadoop.hive.metastore.api.AbortCompactionResponseElement;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
-import org.apache.hadoop.hive.metastore.txn.AcidHouseKeeperService;
+import 
org.apache.hadoop.hive.metastore.txn.service.CompactionHouseKeeperService;
+import org.apache.hadoop.hive.metastore.txn.service.AcidHouseKeeperService;
 import org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil;
 import org.apache.hadoop.hive.metastore.txn.TxnStore;
 import org.apache.hadoop.hive.ql.ddl.DDLTask;
@@ -74,7 +75,7 @@ import org.apache.hadoop.hive.ql.io.BucketCodec;
 import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
 import org.apache.hadoop.hive.ql.lockmgr.TxnManagerFactory;
 import org.apache.hadoop.hive.ql.processors.CommandProcessorException;
-import org.apache.hadoop.hive.metastore.txn.AcidOpenTxnsCounterService;
+import org.apache.hadoop.hive.metastore.txn.service.AcidOpenTxnsCounterService;
 import org.apache.hadoop.hive.ql.scheduled.ScheduledQueryExecutionContext;
 import org.apache.hadoop.hive.ql.scheduled.ScheduledQueryExecutionService;
 import org.apache.hadoop.hive.ql.schq.MockScheduledQueryService;
@@ -1244,9 +1245,14 @@ public class TestTxnCommands2 extends 
TxnCommandsBaseForTests {
 
     MetastoreConf.setTimeVar(hiveConf, 
MetastoreConf.ConfVars.ACID_HOUSEKEEPER_SERVICE_INTERVAL, 10,
         TimeUnit.MILLISECONDS);
+    MetastoreConf.setTimeVar(hiveConf, 
MetastoreConf.ConfVars.COMPACTION_HOUSEKEEPER_SERVICE_INTERVAL, 10,
+        TimeUnit.MILLISECONDS);
     MetastoreTaskThread houseKeeper = new AcidHouseKeeperService();
+    MetastoreTaskThread compactionHouseKeeper = new 
CompactionHouseKeeperService();
     houseKeeper.setConf(hiveConf);
+    compactionHouseKeeper.setConf(hiveConf);
     houseKeeper.run();
+    compactionHouseKeeper.run();
     checkCompactionState(new 
CompactionsByState(numDidNotInitiateCompactions,numFailedCompactions,0,0,0,0,numFailedCompactions
 + numDidNotInitiateCompactions), countCompacts(txnHandler));
 
     txnHandler.compact(new CompactionRequest("default", tblName, 
CompactionType.MAJOR));
@@ -1260,6 +1266,7 @@ public class TestTxnCommands2 extends 
TxnCommandsBaseForTests {
     checkCompactionState(new 
CompactionsByState(numDidNotInitiateCompactions,numFailedCompactions + 
2,0,0,0,0,numFailedCompactions + 2 + numDidNotInitiateCompactions), 
countCompacts(txnHandler));
 
     houseKeeper.run();
+    compactionHouseKeeper.run();
     //COMPACTOR_HISTORY_RETENTION_FAILED failed compacts left (and no other 
since we only have failed ones here)
     checkCompactionState(new CompactionsByState(
       MetastoreConf.getIntVar(hiveConf, 
MetastoreConf.ConfVars.COMPACTOR_HISTORY_RETENTION_DID_NOT_INITIATE),
@@ -1287,6 +1294,7 @@ public class TestTxnCommands2 extends 
TxnCommandsBaseForTests {
 
     runCleaner(hiveConf); // transition to Success state
     houseKeeper.run();
+    compactionHouseKeeper.run();
     checkCompactionState(new CompactionsByState(
             MetastoreConf.getIntVar(hiveConf, 
MetastoreConf.ConfVars.COMPACTOR_HISTORY_RETENTION_DID_NOT_INITIATE),
             MetastoreConf.getIntVar(hiveConf, 
MetastoreConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED), 0, 0, 1, 0,
@@ -3346,8 +3354,11 @@ public class TestTxnCommands2 extends 
TxnCommandsBaseForTests {
 
     // Run AcidHouseKeeperService to cleanup the COMPLETED_TXN_COMPONENTS.
     MetastoreTaskThread houseKeeper = new AcidHouseKeeperService();
+    MetastoreTaskThread compactionHouseKeeper = new 
CompactionHouseKeeperService();
     houseKeeper.setConf(hiveConf);
+    compactionHouseKeeper.setConf(hiveConf);
     houseKeeper.run();
+    compactionHouseKeeper.run();
 
     // Check whether the table is compacted.
     fileStatuses = fs.globStatus(new Path(getWarehouseDir() + "/" + tableName 
+ "/*"));
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java 
b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
index c2d93e0f95e..b6f533490ff 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
@@ -31,7 +31,7 @@ import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.metrics.Metrics;
 import org.apache.hadoop.hive.metastore.metrics.MetricsConstants;
-import org.apache.hadoop.hive.metastore.txn.AcidHouseKeeperService;
+import org.apache.hadoop.hive.metastore.txn.service.AcidHouseKeeperService;
 import org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil;
 import org.apache.hadoop.hive.metastore.txn.TxnStore;
 import org.apache.hadoop.hive.metastore.txn.TxnUtils;
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java 
b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
index f7a02cc7674..250aa738204 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
@@ -38,7 +38,8 @@ import 
org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
 import org.apache.hadoop.hive.metastore.api.TxnType;
 import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
-import org.apache.hadoop.hive.metastore.txn.AcidHouseKeeperService;
+import 
org.apache.hadoop.hive.metastore.txn.service.CompactionHouseKeeperService;
+import org.apache.hadoop.hive.metastore.txn.service.AcidHouseKeeperService;
 import org.apache.hadoop.hive.ql.Driver;
 import org.apache.hadoop.hive.ql.QueryState;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
@@ -3509,8 +3510,11 @@ public class TestDbTxnManager2 extends 
DbTxnManagerEndToEndTestBase{
       5, TestTxnDbUtil.countQueryAgent(conf, "select count(*) from 
\"COMPLETED_TXN_COMPONENTS\""));
 
     MetastoreTaskThread houseKeeper = new AcidHouseKeeperService();
+    MetastoreTaskThread compactionHouseKeeper = new 
CompactionHouseKeeperService();
     houseKeeper.setConf(conf);
+    compactionHouseKeeper.setConf(conf);
     houseKeeper.run();
+    compactionHouseKeeper.run();
 
     Assert.assertEquals(TestTxnDbUtil.queryToString(conf, "select * from 
\"COMPLETED_TXN_COMPONENTS\""),
       2, TestTxnDbUtil.countQueryAgent(conf, "select count(*) from 
\"COMPLETED_TXN_COMPONENTS\""));
@@ -3527,6 +3531,7 @@ public class TestDbTxnManager2 extends 
DbTxnManagerEndToEndTestBase{
       4, TestTxnDbUtil.countQueryAgent(conf, "select count(*) from 
\"COMPLETED_TXN_COMPONENTS\""));
 
     houseKeeper.run();
+    compactionHouseKeeper.run();
     Assert.assertEquals(TestTxnDbUtil.queryToString(conf, "select * from 
\"COMPLETED_TXN_COMPONENTS\""),
       3, TestTxnDbUtil.countQueryAgent(conf, "select count(*) from 
\"COMPLETED_TXN_COMPONENTS\""));
 
diff --git 
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
 
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
index 792e27fc99c..585b55c39f1 100644
--- 
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
+++ 
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
@@ -94,14 +94,17 @@ public class MetastoreConf {
   static final String METASTORE_DELEGATION_MANAGER_CLASS =
       
"org.apache.hadoop.hive.metastore.security.MetastoreDelegationTokenManager";
   @VisibleForTesting
-  static final String ACID_HOUSE_KEEPER_SERVICE_CLASS =
-      "org.apache.hadoop.hive.metastore.txn.AcidHouseKeeperService";
+  static final String ACID_HOUSEKEEPER_SERVICE_CLASS =
+      "org.apache.hadoop.hive.metastore.txn.service.AcidHouseKeeperService";
+  @VisibleForTesting
+  static final String COMPACTION_HOUSEKEEPER_SERVICE_CLASS =
+      
"org.apache.hadoop.hive.metastore.txn.service.CompactionHouseKeeperService";
   @VisibleForTesting
   static final String ACID_TXN_CLEANER_SERVICE_CLASS =
-      "org.apache.hadoop.hive.metastore.txn.AcidTxnCleanerService";
+      "org.apache.hadoop.hive.metastore.txn.service.AcidTxnCleanerService";
   @VisibleForTesting
   static final String ACID_OPEN_TXNS_COUNTER_SERVICE_CLASS =
-      "org.apache.hadoop.hive.metastore.txn.AcidOpenTxnsCounterService";
+      
"org.apache.hadoop.hive.metastore.txn.service.AcidOpenTxnsCounterService";
 
   public static final String 
METASTORE_AUTHENTICATION_LDAP_USERMEMBERSHIPKEY_NAME =
           "metastore.authentication.ldap.userMembershipKey";
@@ -293,6 +296,9 @@ public class MetastoreConf {
     ACID_HOUSEKEEPER_SERVICE_INTERVAL("metastore.acid.housekeeper.interval",
         "hive.metastore.acid.housekeeper.interval", 60, TimeUnit.SECONDS,
         "Time interval describing how often the acid housekeeper runs."),
+    
COMPACTION_HOUSEKEEPER_SERVICE_INTERVAL("metastore.compaction.housekeeper.interval",
+        "hive.metastore.compaction.housekeeper.interval", 300, 
TimeUnit.SECONDS,
+        "Time interval describing how often the acid compaction housekeeper 
runs."),
     ACID_TXN_CLEANER_INTERVAL("metastore.acid.txn.cleaner.interval",
         "hive.metastore.acid.txn.cleaner.interval", 10, TimeUnit.SECONDS,
         "Time interval describing how often aborted and committed txns are 
cleaned."),
@@ -1467,7 +1473,8 @@ public class MetastoreConf {
             "always be started, regardless of whether the metastore is running 
in embedded mode " +
             "or in server mode.  They must implement " + 
METASTORE_TASK_THREAD_CLASS),
     TASK_THREADS_REMOTE_ONLY("metastore.task.threads.remote", 
"metastore.task.threads.remote",
-        ACID_HOUSE_KEEPER_SERVICE_CLASS + "," +
+        ACID_HOUSEKEEPER_SERVICE_CLASS + "," +
+                COMPACTION_HOUSEKEEPER_SERVICE_CLASS + "," +
             ACID_TXN_CLEANER_SERVICE_CLASS + "," +
             ACID_OPEN_TXNS_COUNTER_SERVICE_CLASS + "," +
             MATERIALZIATIONS_REBUILD_LOCK_CLEANER_TASK_CLASS + "," +
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/HouseKeepingTasks.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/HouseKeepingTasks.java
index 5db4bba3f4e..3a4414fd004 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/HouseKeepingTasks.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/HouseKeepingTasks.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hive.metastore.HiveMetaStore;
 import org.apache.hadoop.hive.metastore.MetastoreTaskThread;
 import org.apache.hadoop.hive.metastore.ThreadPool;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import 
org.apache.hadoop.hive.metastore.txn.service.CompactionHouseKeeperService;
 import org.apache.hadoop.hive.metastore.utils.JavaUtils;
 
 import java.util.ArrayList;
@@ -58,9 +59,15 @@ public class HouseKeepingTasks implements 
LeaderElection.LeadershipStateListener
         MetastoreConf.ConfVars.METASTORE_HOUSEKEEPING_THREADS_ON)) {
       return remoteOnlyTasks;
     }
+    boolean isCompactorEnabled = MetastoreConf.getBoolVar(configuration, 
MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON)
+            || MetastoreConf.getBoolVar(configuration, 
MetastoreConf.ConfVars.COMPACTOR_CLEANER_ON);
+
     Collection<String> taskNames =
         MetastoreConf.getStringCollection(configuration, 
MetastoreConf.ConfVars.TASK_THREADS_REMOTE_ONLY);
     for (String taskName : taskNames) {
+      if (CompactionHouseKeeperService.class.getName().equals(taskName) && 
!isCompactorEnabled) {
+        continue;
+      }
       MetastoreTaskThread task =
           JavaUtils.newInstance(JavaUtils.getClass(taskName, 
MetastoreTaskThread.class));
       remoteOnlyTasks.add(task);
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 9be5e475b6e..92437bdbba0 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -99,6 +99,7 @@ import 
org.apache.hadoop.hive.metastore.txn.jdbc.ParameterizedCommand;
 import org.apache.hadoop.hive.metastore.txn.retry.SqlRetryCallProperties;
 import org.apache.hadoop.hive.metastore.txn.retry.SqlRetryException;
 import org.apache.hadoop.hive.metastore.txn.retry.SqlRetryHandler;
+import org.apache.hadoop.hive.metastore.txn.service.AcidHouseKeeperService;
 import org.apache.hadoop.hive.metastore.utils.JavaUtils;
 import org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils;
 import org.apache.hadoop.util.StringUtils;
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/AcidHouseKeeperService.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/service/AcidHouseKeeperService.java
similarity index 65%
rename from 
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/AcidHouseKeeperService.java
rename to 
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/service/AcidHouseKeeperService.java
index 0a16006c49e..86799e90621 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/AcidHouseKeeperService.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/service/AcidHouseKeeperService.java
@@ -15,19 +15,24 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hive.metastore.txn;
+package org.apache.hadoop.hive.metastore.txn.service;
+
+import com.google.common.collect.ImmutableMap;
 
-import org.apache.commons.lang3.Functions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.metastore.MetastoreTaskThread;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
-import static org.apache.commons.lang3.Functions.FailableRunnable;
+import org.apache.commons.lang3.function.FailableRunnable;
+import org.apache.commons.lang3.function.Failable;
 
 /**
  * Performs background tasks for Transaction management in Hive.
@@ -38,16 +43,27 @@ public class AcidHouseKeeperService implements 
MetastoreTaskThread {
   private static final Logger LOG = 
LoggerFactory.getLogger(AcidHouseKeeperService.class);
 
   private Configuration conf;
-  private boolean isCompactorEnabled;
-  private TxnStore txnHandler;
+  protected TxnStore txnHandler;
+  protected String serviceName;
+  protected Map<FailableRunnable<MetaException>, String> tasks;
+
+  public AcidHouseKeeperService() {
+    serviceName = this.getClass().getSimpleName();
+  }
 
   @Override
   public void setConf(Configuration configuration) {
     conf = configuration;
-    isCompactorEnabled =
-        MetastoreConf.getBoolVar(conf, 
MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON) || MetastoreConf.getBoolVar(conf,
-            MetastoreConf.ConfVars.COMPACTOR_CLEANER_ON);
     txnHandler = TxnUtils.getTxnStore(conf);
+    initTasks();
+  }
+
+  protected void initTasks(){
+    tasks = ImmutableMap.<FailableRunnable<MetaException>, String>builder()
+            .put(txnHandler::performTimeOuts, "Cleaning timed out txns and 
locks")
+            .put(txnHandler::performWriteSetGC, "Cleaning obsolete write set 
entries")
+            .put(txnHandler::cleanTxnToWriteIdTable, "Cleaning obsolete 
TXN_TO_WRITE_ID entries")
+            .build();
   }
 
   @Override
@@ -65,12 +81,12 @@ public class AcidHouseKeeperService implements 
MetastoreTaskThread {
     TxnStore.MutexAPI.LockHandle handle = null;
     try {
       handle = 
txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.HouseKeeper.name());
-      LOG.info("Starting to run AcidHouseKeeperService.");
+      LOG.info("Starting to run {}", serviceName);
       long start = System.currentTimeMillis();
       cleanTheHouse();
-      LOG.debug("Total time AcidHouseKeeperService took: {} seconds.", 
elapsedSince(start));
-    } catch (Throwable t) {
-      LOG.error("Unexpected error in thread: {}, message: {}", 
Thread.currentThread().getName(), t.getMessage(), t);
+      LOG.debug("Total time {} took: {} seconds.", serviceName, 
elapsedSince(start));
+    } catch (Exception e) {
+      LOG.error("Unexpected exception in thread: {}, message: {}", 
Thread.currentThread().getName(), e.getMessage(), e);
     } finally {
       if (handle != null) {
         handle.releaseLocks();
@@ -79,18 +95,12 @@ public class AcidHouseKeeperService implements 
MetastoreTaskThread {
   }
 
   private void cleanTheHouse() {
-    performTask(txnHandler::performTimeOuts, "Cleaning timed out txns and 
locks");
-    performTask(txnHandler::performWriteSetGC, "Cleaning obsolete write set 
entries");
-    performTask(txnHandler::cleanTxnToWriteIdTable, "Cleaning obsolete 
TXN_TO_WRITE_ID entries");
-    if (isCompactorEnabled) {
-      performTask(txnHandler::removeDuplicateCompletedTxnComponents, "Cleaning 
duplicate COMPLETED_TXN_COMPONENTS entries");
-      performTask(txnHandler::purgeCompactionHistory, "Cleaning obsolete 
compaction history entries");
-    }
+    tasks.forEach(this::performTask);
   }
 
   private void performTask(FailableRunnable<MetaException> task, String 
description) {
     long start = System.currentTimeMillis();
-    Functions.run(task);
+    Failable.run(task);
     LOG.debug("{} took {} seconds.", description, elapsedSince(start));
   }
 
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/AcidOpenTxnsCounterService.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/service/AcidOpenTxnsCounterService.java
similarity index 88%
rename from 
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/AcidOpenTxnsCounterService.java
rename to 
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/service/AcidOpenTxnsCounterService.java
index fe788004398..89dbff3f96f 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/AcidOpenTxnsCounterService.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/service/AcidOpenTxnsCounterService.java
@@ -15,10 +15,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hive.metastore.txn;
+package org.apache.hadoop.hive.metastore.txn.service;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.metastore.MetastoreTaskThread;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -55,8 +57,8 @@ public class AcidOpenTxnsCounterService implements 
MetastoreTaskThread {
         lastLogTime = now;
       }
     }
-    catch (Throwable t) {
-      LOG.error("Unexpected error in thread: {}, message: {}", 
Thread.currentThread().getName(), t.getMessage(), t);
+    catch (Exception e) {
+      LOG.error("Unexpected exception in thread: {}, message: {}", 
Thread.currentThread().getName(), e.getMessage(), e);
     }
   }
 
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/AcidTxnCleanerService.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/service/AcidTxnCleanerService.java
similarity index 88%
rename from 
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/AcidTxnCleanerService.java
rename to 
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/service/AcidTxnCleanerService.java
index d2800bd008b..06f284faee0 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/AcidTxnCleanerService.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/service/AcidTxnCleanerService.java
@@ -15,11 +15,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hive.metastore.txn;
+package org.apache.hadoop.hive.metastore.txn.service;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.metastore.MetastoreTaskThread;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -60,8 +62,8 @@ public class AcidTxnCleanerService implements 
MetastoreTaskThread {
       long start = System.currentTimeMillis();
       txnHandler.cleanEmptyAbortedAndCommittedTxns();
       LOG.debug("Txn cleaner service took: {} seconds.", elapsedSince(start));
-    } catch (Throwable t) {
-      LOG.error("Unexpected error in thread: {}, message: {}", 
Thread.currentThread().getName(), t.getMessage(), t);
+    } catch (Exception e) {
+      LOG.error("Unexpected exception in thread: {}, message: {}", 
Thread.currentThread().getName(), e.getMessage(), e);
     } finally {
       if (handle != null) {
         handle.releaseLocks();
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/service/CompactionHouseKeeperService.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/service/CompactionHouseKeeperService.java
new file mode 100644
index 00000000000..6eca4828344
--- /dev/null
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/service/CompactionHouseKeeperService.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.txn.service;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.commons.lang3.function.FailableRunnable;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Performs background tasks for Transaction management in Hive.
+ * Runs inside Hive Metastore Service.
+ */
+public class CompactionHouseKeeperService extends AcidHouseKeeperService {
+
+  public CompactionHouseKeeperService() {
+    serviceName = this.getClass().getSimpleName();
+  }
+
+  @Override
+  protected void initTasks(){
+    tasks = ImmutableMap.<FailableRunnable<MetaException>, String>builder()
+            .put(txnHandler::removeDuplicateCompletedTxnComponents,
+                    "Cleaning duplicate COMPLETED_TXN_COMPONENTS entries")
+            .put(txnHandler::purgeCompactionHistory, "Cleaning obsolete 
compaction history entries")
+            .build();
+  }
+
+  @Override
+  public long runFrequency(TimeUnit unit) {
+    return MetastoreConf.getTimeVar(getConf(), 
MetastoreConf.ConfVars.COMPACTION_HOUSEKEEPER_SERVICE_INTERVAL,
+        unit);
+  }
+}
diff --git 
a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/conf/TestMetastoreConf.java
 
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/conf/TestMetastoreConf.java
index a5a8145ccbd..b5208c7fe75 100644
--- 
a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/conf/TestMetastoreConf.java
+++ 
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/conf/TestMetastoreConf.java
@@ -20,7 +20,8 @@ package org.apache.hadoop.hive.metastore.conf;
 import org.apache.hadoop.hive.metastore.annotation.MetastoreUnitTest;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.metastore.txn.AcidTxnCleanerService;
+import 
org.apache.hadoop.hive.metastore.txn.service.CompactionHouseKeeperService;
+import org.apache.hadoop.hive.metastore.txn.service.AcidTxnCleanerService;
 import org.hamcrest.CoreMatchers;
 import org.hamcrest.core.StringContains;
 import org.hamcrest.core.StringEndsWith;
@@ -50,8 +51,8 @@ import 
org.apache.hadoop.hive.metastore.RuntimeStatsCleanerTask;
 import org.apache.hadoop.hive.metastore.SerDeStorageSchemaReader;
 import org.apache.hadoop.hive.metastore.events.EventCleanerTask;
 import 
org.apache.hadoop.hive.metastore.security.MetastoreDelegationTokenManager;
-import org.apache.hadoop.hive.metastore.txn.AcidHouseKeeperService;
-import org.apache.hadoop.hive.metastore.txn.AcidOpenTxnsCounterService;
+import org.apache.hadoop.hive.metastore.txn.service.AcidHouseKeeperService;
+import org.apache.hadoop.hive.metastore.txn.service.AcidOpenTxnsCounterService;
 
 @Category(MetastoreUnitTest.class)
 public class TestMetastoreConf {
@@ -500,8 +501,10 @@ public class TestMetastoreConf {
         EventCleanerTask.class.getName());
     Assert.assertEquals(MetastoreConf.METASTORE_DELEGATION_MANAGER_CLASS,
         MetastoreDelegationTokenManager.class.getName());
-    Assert.assertEquals(MetastoreConf.ACID_HOUSE_KEEPER_SERVICE_CLASS,
+    Assert.assertEquals(MetastoreConf.ACID_HOUSEKEEPER_SERVICE_CLASS,
         AcidHouseKeeperService.class.getName());
+    Assert.assertEquals(MetastoreConf.COMPACTION_HOUSEKEEPER_SERVICE_CLASS,
+        CompactionHouseKeeperService.class.getName());
     Assert.assertEquals(MetastoreConf.ACID_TXN_CLEANER_SERVICE_CLASS,
         AcidTxnCleanerService.class.getName());
     Assert.assertEquals(MetastoreConf.ACID_OPEN_TXNS_COUNTER_SERVICE_CLASS,
diff --git 
a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/txn/TestAcidTxnCleanerService.java
 
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/txn/TestAcidTxnCleanerService.java
index 178b68fdfce..0e27bf393f4 100644
--- 
a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/txn/TestAcidTxnCleanerService.java
+++ 
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/txn/TestAcidTxnCleanerService.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hive.metastore.api.OpenTxnRequest;
 import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse;
 import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.txn.service.AcidTxnCleanerService;
 import org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil;
 import org.junit.After;
 import org.junit.Assert;
diff --git a/streaming/src/test/org/apache/hive/streaming/TestStreaming.java 
b/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
index 16c002cd572..5ee780005a6 100644
--- a/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
+++ b/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
@@ -73,7 +73,7 @@ import 
org.apache.hadoop.hive.metastore.api.TxnAbortedException;
 import org.apache.hadoop.hive.metastore.api.TxnInfo;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
-import org.apache.hadoop.hive.metastore.txn.AcidHouseKeeperService;
+import org.apache.hadoop.hive.metastore.txn.service.AcidHouseKeeperService;
 import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils;
 import org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil;
 import org.apache.hadoop.hive.metastore.txn.TxnStore;

Reply via email to