This is an automated email from the ASF dual-hosted git repository.

klcopp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 6a53540  HIVE-25737: Compaction Observability: 
Initiator/Worker/Cleaner cycle measurement improvements (Viktor Csomor, 
reviewed by Laszlo Pinter and Karen Coppage)
6a53540 is described below

commit 6a535403a49b972bb0a488d65bab06c57e3c8c66
Author: Viktor Csomor <csomor.vik...@gmail.com>
AuthorDate: Thu Dec 9 09:32:33 2021 +0100

    HIVE-25737: Compaction Observability: Initiator/Worker/Cleaner cycle 
measurement improvements (Viktor Csomor, reviewed by Laszlo Pinter and Karen 
Coppage)
    
    Closes #2827.
    
    A daemon thread has been implemented for the Initiator that measures the 
elapsed time since its start.
    The PerformanceLogger approach is also kept but the metrics intended to use 
the gauge style
    
    The Age of Oldest Working Compaction metric has been implemented in the 
AcidMetricService
    
    The Age of Oldest active Cleaner metric has been implemented
    - CQ_CLEANER_START field added to the COMPACTION_QUEUE table
    - COMPACTION_OLDEST_CLEANING_AGE metric has been added
    - markCleaning method added to the CompactionTxnHandler
    - ShowCompactResponseElement extended with the cleanerStart field
---
 .../java/org/apache/hadoop/hive/conf/HiveConf.java |  16 ++
 .../upgrade/hive/hive-schema-4.0.0.hive.sql        |  21 ++-
 .../upgrade/hive/upgrade-3.1.0-to-4.0.0.hive.sql   |  21 ++-
 .../hadoop/hive/ql/txn/compactor/Cleaner.java      |  31 ++++
 .../hadoop/hive/ql/txn/compactor/Initiator.java    |  55 +++++-
 .../ql/txn/compactor/MetaStoreCompactorThread.java |  39 +++++
 .../ql/txn/compactor/TestCompactionMetrics.java    | 184 +++++++++++++++++----
 .../test/results/clientpositive/llap/sysdb.q.out   |  11 +-
 .../gen/thrift/gen-cpp/hive_metastore_types.cpp    |  22 +++
 .../src/gen/thrift/gen-cpp/hive_metastore_types.h  |  12 +-
 .../metastore/api/ShowCompactResponseElement.java  | 106 +++++++++++-
 .../metastore/ShowCompactResponseElement.php       |  24 +++
 .../src/gen/thrift/gen-py/hive_metastore/ttypes.py |  14 +-
 .../src/gen/thrift/gen-rb/hive_metastore_types.rb  |   4 +-
 .../hadoop/hive/metastore/conf/MetastoreConf.java  |  12 ++
 .../src/main/thrift/hive_metastore.thrift          |   3 +-
 .../hive/metastore/metrics/AcidMetricService.java  |  71 +++++---
 .../hive/metastore/metrics/MetricsConstants.java   |   4 +
 .../hive/metastore/txn/CompactionTxnHandler.java   | 133 +++++++++++++--
 .../hadoop/hive/metastore/txn/TxnHandler.java      |  39 +++--
 .../apache/hadoop/hive/metastore/txn/TxnStore.java |  66 +++++++-
 .../src/main/sql/derby/hive-schema-4.0.0.derby.sql |   3 +-
 .../sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql     |   3 +
 .../src/main/sql/mssql/hive-schema-4.0.0.mssql.sql |   1 +
 .../sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql     |   3 +
 .../src/main/sql/mysql/hive-schema-4.0.0.mysql.sql |   3 +-
 .../sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql     |   3 +
 .../main/sql/oracle/hive-schema-4.0.0.oracle.sql   |   3 +-
 .../sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql   |   4 +-
 .../sql/postgres/hive-schema-4.0.0.postgres.sql    |   3 +-
 .../postgres/upgrade-3.2.0-to-4.0.0.postgres.sql   |   4 +-
 .../upgrade-3.1.3000-to-4.0.0.postgres.sql         |   3 +
 32 files changed, 800 insertions(+), 121 deletions(-)

diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 988cec8..d9eec2e 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -3190,6 +3190,22 @@ public class HiveConf extends Configuration {
         "has had a transaction done on it since the last major compaction. So 
decreasing this\n" +
         "value will increase the load on the NameNode."),
 
+    
HIVE_COMPACTOR_INITIATOR_DURATION_UPDATE_INTERVAL("hive.compactor.initiator.duration.update.interval",
 "60s",
+        new TimeValidator(TimeUnit.SECONDS),
+        "Time in seconds that drives the update interval of 
compaction_initiator_duration metric.\n" +
+            "Smaller value results in a fine grained metric update.\n" +
+            "This updater can be turned off if its value less than or equals 
to zero.\n"+
+            "In this case the above metric will be update only after the 
initiator completed one cycle.\n" +
+            "The hive.compactor.initiator.on must be turned on (true) in-order 
to enable the Initiator,\n" +
+            "otherwise this setting has no effect."),
+
+    
HIVE_COMPACTOR_CLEANER_DURATION_UPDATE_INTERVAL("hive.compactor.cleaner.duration.update.interval",
 "60s",
+        new TimeValidator(TimeUnit.SECONDS),
+        "Time in seconds that drives the update interval of 
compaction_cleaner_duration metric.\n" +
+            "Smaller value results in a fine grained metric update.\n" +
+            "This updater can be turned off if its value less than or equals 
to zero.\n"+
+            "In this case the above metric will be update only after the 
cleaner completed one cycle."),
+
     HIVE_COMPACTOR_REQUEST_QUEUE("hive.compactor.request.queue", 1,
         "Enables parallelization of the checkForCompaction operation, that 
includes many file metadata checks\n" +
         "and may be expensive"),
diff --git a/metastore/scripts/upgrade/hive/hive-schema-4.0.0.hive.sql 
b/metastore/scripts/upgrade/hive/hive-schema-4.0.0.hive.sql
index 9de3488..d0654a5 100644
--- a/metastore/scripts/upgrade/hive/hive-schema-4.0.0.hive.sql
+++ b/metastore/scripts/upgrade/hive/hive-schema-4.0.0.hive.sql
@@ -1092,7 +1092,8 @@ CREATE EXTERNAL TABLE IF NOT EXISTS `COMPACTION_QUEUE` (
   `CQ_ERROR_MESSAGE` string,
   `CQ_INITIATOR_ID` string,
   `CQ_INITIATOR_VERSION` string,
-  `CQ_WORKER_VERSION` string
+  `CQ_WORKER_VERSION` string,
+  `CQ_CLEANER_START` bigint
 )
 STORED BY 'org.apache.hive.storage.jdbc.JdbcStorageHandler'
 TBLPROPERTIES (
@@ -1115,7 +1116,8 @@ TBLPROPERTIES (
   \"COMPACTION_QUEUE\".\"CQ_ERROR_MESSAGE\",
   \"COMPACTION_QUEUE\".\"CQ_INITIATOR_ID\",
   \"COMPACTION_QUEUE\".\"CQ_INITIATOR_VERSION\",
-  \"COMPACTION_QUEUE\".\"CQ_WORKER_VERSION\"
+  \"COMPACTION_QUEUE\".\"CQ_WORKER_VERSION\",
+  \"COMPACTION_QUEUE\".\"CQ_CLEANER_START\"
 FROM \"COMPACTION_QUEUE\"
 "
 );
@@ -1187,7 +1189,8 @@ CREATE OR REPLACE VIEW `COMPACTIONS`
   `C_HIGHEST_WRITE_ID`,
   `C_INITIATOR_HOST`,
   `C_INITIATOR_ID`,
-  `C_INITIATOR_VERSION`
+  `C_INITIATOR_VERSION`,
+  `C_CLEANER_START`
 ) AS
 SELECT
   CC_ID,
@@ -1209,7 +1212,8 @@ SELECT
   CC_HIGHEST_WRITE_ID,
   CASE WHEN CC_INITIATOR_ID IS NULL THEN cast (null as string) ELSE 
split(CC_INITIATOR_ID,"-")[0] END,
   CASE WHEN CC_INITIATOR_ID IS NULL THEN cast (null as string) ELSE 
split(CC_INITIATOR_ID,"-")[size(split(CC_INITIATOR_ID,"-"))-1] END,
-  CC_INITIATOR_VERSION
+  CC_INITIATOR_VERSION,
+  NULL
 FROM COMPLETED_COMPACTIONS
 UNION ALL
 SELECT
@@ -1231,7 +1235,8 @@ SELECT
   CQ_HIGHEST_WRITE_ID,
   CASE WHEN CQ_INITIATOR_ID IS NULL THEN NULL ELSE 
split(CQ_INITIATOR_ID,"-")[0] END,
   CASE WHEN CQ_INITIATOR_ID IS NULL THEN NULL ELSE 
split(CQ_INITIATOR_ID,"-")[size(split(CQ_INITIATOR_ID,"-"))-1] END,
-  CQ_INITIATOR_VERSION
+  CQ_INITIATOR_VERSION,
+  CQ_CLEANER_START
 FROM COMPACTION_QUEUE;
 
 CREATE EXTERNAL TABLE IF NOT EXISTS `SCHEDULED_QUERIES` (
@@ -1874,7 +1879,8 @@ CREATE OR REPLACE VIEW `COMPACTIONS`
   `C_HIGHEST_WRITE_ID`,
   `C_INITIATOR_HOST`,
   `C_INITIATOR_ID`,
-  `C_INITIATOR_VERSION`
+  `C_INITIATOR_VERSION`,
+  `C_CLEANER_START`
 ) AS
 SELECT DISTINCT
   C_ID,
@@ -1895,7 +1901,8 @@ SELECT DISTINCT
   C_HIGHEST_WRITE_ID,
   C_INITIATOR_HOST,
   C_INITIATOR_ID,
-  C_INITIATOR_VERSION
+  C_INITIATOR_VERSION,
+  C_CLEANER_START
 FROM
   `sys`.`COMPACTIONS` C JOIN `sys`.`TBLS` T ON (C.`C_TABLE` = T.`TBL_NAME`)
                         JOIN `sys`.`DBS` D ON (C.`C_DATABASE` = D.`NAME`)
diff --git a/metastore/scripts/upgrade/hive/upgrade-3.1.0-to-4.0.0.hive.sql 
b/metastore/scripts/upgrade/hive/upgrade-3.1.0-to-4.0.0.hive.sql
index 6e3a9e1..fb76575 100644
--- a/metastore/scripts/upgrade/hive/upgrade-3.1.0-to-4.0.0.hive.sql
+++ b/metastore/scripts/upgrade/hive/upgrade-3.1.0-to-4.0.0.hive.sql
@@ -210,7 +210,8 @@ CREATE EXTERNAL TABLE IF NOT EXISTS `COMPACTION_QUEUE` (
   `CQ_ERROR_MESSAGE` string,
   `CQ_INITIATOR_ID` string,
   `CQ_INITIATOR_VERSION` string,
-  `CQ_WORKER_VERSION` string
+  `CQ_WORKER_VERSION` string,
+  `CQ_CLEANER_START` bigint
 )
 STORED BY 'org.apache.hive.storage.jdbc.JdbcStorageHandler'
 TBLPROPERTIES (
@@ -233,7 +234,8 @@ TBLPROPERTIES (
   \"COMPACTION_QUEUE\".\"CQ_ERROR_MESSAGE\",
   \"COMPACTION_QUEUE\".\"CQ_INITIATOR_ID\",
   \"COMPACTION_QUEUE\".\"CQ_INITIATOR_VERSION\",
-  \"COMPACTION_QUEUE\".\"CQ_WORKER_VERSION\"
+  \"COMPACTION_QUEUE\".\"CQ_WORKER_VERSION\",
+  \"COMPACTION_QUEUE\".\"CQ_CLEANER_START\"
 FROM \"COMPACTION_QUEUE\"
 "
 );
@@ -305,7 +307,8 @@ CREATE OR REPLACE VIEW `COMPACTIONS`
   `C_HIGHEST_WRITE_ID`,
   `C_INITIATOR_HOST`,
   `C_INITIATOR_ID`,
-  `C_INITIATOR_VERSION`
+  `C_INITIATOR_VERSION`,
+  `C_CLEANER_START`
 ) AS
 SELECT
   CC_ID,
@@ -327,7 +330,8 @@ SELECT
   CC_HIGHEST_WRITE_ID,
   CASE WHEN CC_INITIATOR_ID IS NULL THEN cast (null as string) ELSE 
split(CC_INITIATOR_ID,"-")[0] END,
   CASE WHEN CC_INITIATOR_ID IS NULL THEN cast (null as string) ELSE 
split(CC_INITIATOR_ID,"-")[size(split(CC_INITIATOR_ID,"-"))-1] END,
-  CC_INITIATOR_VERSION
+  CC_INITIATOR_VERSION,
+  NULL
 FROM COMPLETED_COMPACTIONS
 UNION ALL
 SELECT
@@ -349,7 +353,8 @@ SELECT
   CQ_HIGHEST_WRITE_ID,
   CASE WHEN CQ_INITIATOR_ID IS NULL THEN NULL ELSE 
split(CQ_INITIATOR_ID,"-")[0] END,
   CASE WHEN CQ_INITIATOR_ID IS NULL THEN NULL ELSE 
split(CQ_INITIATOR_ID,"-")[size(split(CQ_INITIATOR_ID,"-"))-1] END,
-  CQ_INITIATOR_VERSION
+  CQ_INITIATOR_VERSION,
+  CQ_CLEANER_START
 FROM COMPACTION_QUEUE;
 
 -- HIVE-22553
@@ -841,7 +846,8 @@ CREATE OR REPLACE VIEW `COMPACTIONS`
   `C_HIGHEST_WRITE_ID`,
   `C_INITIATOR_HOST`,
   `C_INITIATOR_ID`,
-  `C_INITIATOR_VERSION`
+  `C_INITIATOR_VERSION`,
+  `C_CLEANER_START`
 ) AS
 SELECT DISTINCT
   C_ID,
@@ -862,7 +868,8 @@ SELECT DISTINCT
   C_HIGHEST_WRITE_ID,
   C_INITIATOR_HOST,
   C_INITIATOR_ID,
-  C_INITIATOR_VERSION
+  C_INITIATOR_VERSION,
+  C_CLEANER_START
 FROM
   `sys`.`COMPACTIONS` C JOIN `sys`.`TBLS` T ON (C.`C_TABLE` = T.`TBL_NAME`)
                         JOIN `sys`.`DBS` D ON (C.`C_DATABASE` = D.`NAME`)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
index 8149494..38f4fec 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
@@ -113,6 +113,14 @@ public class Cleaner extends MetaStoreCompactorThread {
         try {
           handle = 
txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Cleaner.name());
           startedAt = System.currentTimeMillis();
+
+          if (metricsEnabled) {
+            stopCycleUpdater();
+            startCycleUpdater(HiveConf.getTimeVar(conf,
+                    
HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_DURATION_UPDATE_INTERVAL, 
TimeUnit.MILLISECONDS),
+                    new 
CleanerCycleUpdater(MetricsConstants.COMPACTION_CLEANER_CYCLE_DURATION, 
startedAt));
+          }
+
           long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner();
 
           List<CompactionInfo> readyToClean = 
txnHandler.findReadyToClean(minOpenTxnId, retentionTime);
@@ -145,6 +153,10 @@ public class Cleaner extends MetaStoreCompactorThread {
           if (handle != null) {
             handle.releaseLocks();
           }
+          if (metricsEnabled) {
+            
updateCycleDurationMetric(MetricsConstants.COMPACTION_INITIATOR_CYCLE_DURATION, 
startedAt);
+          }
+          stopCycleUpdater();
         }
         // Now, go back to bed until it's time to do this again
         long elapsedTime = System.currentTimeMillis() - startedAt;
@@ -204,6 +216,9 @@ public class Cleaner extends MetaStoreCompactorThread {
           return;
         }
       }
+
+      txnHandler.markCleanerStart(ci);
+
       StorageDescriptor sd = resolveStorageDescriptor(t, p);
       final String location = sd.getLocation();
       ValidTxnList validTxnList =
@@ -266,6 +281,7 @@ public class Cleaner extends MetaStoreCompactorThread {
       if (removedFiles.value || isDynPartAbort(t, ci)) {
         txnHandler.markCleaned(ci);
       } else {
+        txnHandler.clearCleanerStart(ci);
         LOG.warn("No files were removed. Leaving queue entry " + ci + " in 
ready for cleaning state.");
       }
     } catch (Exception e) {
@@ -371,4 +387,19 @@ public class Cleaner extends MetaStoreCompactorThread {
     }
     return true;
   }
+
+  private static class CleanerCycleUpdater implements Runnable {
+    private final String metric;
+    private final long startedAt;
+
+    CleanerCycleUpdater(String metric, long startedAt) {
+      this.metric = metric;
+      this.startedAt = startedAt;
+    }
+
+    @Override
+    public void run() {
+      updateCycleDurationMetric(metric, startedAt);
+    }
+  }
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
index adebd31..bffa773 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
@@ -121,13 +121,24 @@ public class Initiator extends MetaStoreCompactorThread {
         // don't doom the entire thread.
         try {
           handle = 
txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Initiator.name());
+          startedAt = System.currentTimeMillis();
+          long compactionInterval = (prevStart < 0) ? prevStart : (startedAt - 
prevStart) / 1000;
+          prevStart = startedAt;
+
           if (metricsEnabled) {
             perfLogger.perfLogBegin(CLASS_NAME, 
MetricsConstants.COMPACTION_INITIATOR_CYCLE);
+            stopCycleUpdater();
+            startCycleUpdater(HiveConf.getTimeVar(conf,
+                    
HiveConf.ConfVars.HIVE_COMPACTOR_INITIATOR_DURATION_UPDATE_INTERVAL, 
TimeUnit.MILLISECONDS),
+                new 
InitiatorCycleUpdater(MetricsConstants.COMPACTION_INITIATOR_CYCLE_DURATION,
+                    startedAt,
+                    MetastoreConf.getTimeVar(conf,
+                        
MetastoreConf.ConfVars.COMPACTOR_LONG_RUNNING_INITIATOR_THRESHOLD_WARNING,
+                        TimeUnit.MILLISECONDS),
+                    MetastoreConf.getTimeVar(conf,
+                        
MetastoreConf.ConfVars.COMPACTOR_LONG_RUNNING_INITIATOR_THRESHOLD_ERROR,
+                        TimeUnit.MILLISECONDS)));
           }
-          startedAt = System.currentTimeMillis();
-
-          long compactionInterval = (prevStart < 0) ? prevStart : (startedAt - 
prevStart)/1000;
-          prevStart = startedAt;
 
           final ShowCompactResponse currentCompactions = 
txnHandler.showCompact(new ShowCompactRequest());
 
@@ -191,7 +202,9 @@ public class Initiator extends MetaStoreCompactorThread {
           }
           if (metricsEnabled) {
             perfLogger.perfLogEnd(CLASS_NAME, 
MetricsConstants.COMPACTION_INITIATOR_CYCLE);
+            
updateCycleDurationMetric(MetricsConstants.COMPACTION_INITIATOR_CYCLE_DURATION, 
startedAt);
           }
+          stopCycleUpdater();
         }
 
         long elapsedTime = System.currentTimeMillis() - startedAt;
@@ -558,4 +571,38 @@ public class Initiator extends MetaStoreCompactorThread {
     name.append(threadId);
     return name.toString();
   }
+
+  private static class InitiatorCycleUpdater implements Runnable {
+    private final String metric;
+    private final long startedAt;
+    private final long warningThreshold;
+    private final long errorThreshold;
+
+    private boolean errorReported;
+    private boolean warningReported;
+
+    InitiatorCycleUpdater(String metric, long startedAt,
+        long warningThreshold, long errorThreshold) {
+      this.metric = metric;
+      this.startedAt = startedAt;
+      this.warningThreshold = warningThreshold;
+      this.errorThreshold = errorThreshold;
+    }
+
+    @Override
+    public void run() {
+      long elapsed = updateCycleDurationMetric(metric, startedAt);
+      if (elapsed >= errorThreshold) {
+        if (!errorReported) {
+          LOG.error("Long running Initiator has been detected, duration {}", 
elapsed);
+          errorReported = true;
+        }
+      } else if (elapsed >= warningThreshold) {
+        if (!warningReported && !errorReported) {
+          warningReported = true;
+          LOG.warn("Long running Initiator has been detected, duration {}", 
elapsed);
+        }
+      }
+    }
+  }
 }
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java
index 7bf1304..4184caf 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java
@@ -17,12 +17,14 @@
  */
 package org.apache.hadoop.hive.ql.txn.compactor;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.hive.metastore.MetaStoreThread;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.metrics.Metrics;
 import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
 import org.apache.hadoop.hive.metastore.txn.TxnStore;
 import org.apache.hadoop.hive.metastore.txn.TxnUtils;
@@ -31,6 +33,9 @@ import org.apache.thrift.TException;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.apache.hadoop.hive.metastore.HMSHandler.getMSForConf;
@@ -44,6 +49,7 @@ public class MetaStoreCompactorThread extends CompactorThread 
implements MetaSto
 
   protected TxnStore txnHandler;
   protected int threadId;
+  protected ScheduledExecutorService cycleUpdaterExecutorService;
 
   @Override
   public void setThreadId(int threadId) {
@@ -94,4 +100,37 @@ public class MetaStoreCompactorThread extends 
CompactorThread implements MetaSto
       throw new MetaException(e.toString());
     }
   }
+
+  protected void startCycleUpdater(long updateInterval, Runnable taskToRun) {
+    if (cycleUpdaterExecutorService == null) {
+      if (updateInterval > 0) {
+        cycleUpdaterExecutorService = 
Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder()
+            .setPriority(Thread.currentThread().getPriority())
+            .setDaemon(true)
+            .setNameFormat("Cycle-Duration-Updater-%d")
+            .build());
+        cycleUpdaterExecutorService.scheduleAtFixedRate(
+            taskToRun,
+            updateInterval, updateInterval, TimeUnit.MILLISECONDS);
+      }
+    }
+  }
+
+  protected void stopCycleUpdater() {
+    if (cycleUpdaterExecutorService != null) {
+      cycleUpdaterExecutorService.shutdownNow();
+      cycleUpdaterExecutorService = null;
+    }
+  }
+
+  protected static long updateCycleDurationMetric(String metric, long 
startedAt) {
+    if (startedAt >= 0) {
+      long elapsed = System.currentTimeMillis() - startedAt;
+      LOG.debug("Updating {} metric to {}", metric, elapsed);
+      Metrics.getOrCreateGauge(metric)
+          .set((int)elapsed);
+      return elapsed;
+    }
+    return 0;
+  }
 }
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionMetrics.java
 
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionMetrics.java
index 13c97fb..75c722b 100644
--- 
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionMetrics.java
+++ 
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionMetrics.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hive.ql.txn.compactor;
 
 import com.codahale.metrics.Counter;
+import com.google.common.collect.ImmutableList;
 import org.apache.hadoop.hive.common.ServerUtils;
 import org.apache.hadoop.hive.common.metrics.MetricsTestUtils;
 import org.apache.hadoop.hive.common.metrics.common.MetricsFactory;
@@ -434,7 +435,7 @@ public class TestCompactionMetrics  extends CompactorTest {
     elements.add(generateElement(4,"db", "tb3", "p1", CompactionType.MINOR, 
TxnStore.FAILED_RESPONSE));
 
     elements.add(generateElement(6,"db1", "tb", null, CompactionType.MINOR, 
TxnStore.FAILED_RESPONSE,
-            System.currentTimeMillis(), true, "4.0.0", "4.0.0"));
+            System.currentTimeMillis(), true, "4.0.0", "4.0.0", 10));
     elements.add(generateElement(7,"db1", "tb2", null, CompactionType.MINOR, 
TxnStore.FAILED_RESPONSE));
     elements.add(generateElement(8,"db1", "tb3", null, CompactionType.MINOR, 
TxnStore.FAILED_RESPONSE));
 
@@ -446,12 +447,12 @@ public class TestCompactionMetrics  extends CompactorTest 
{
     elements.add(generateElement(13,"db3", "tb3", null, CompactionType.MINOR, 
TxnStore.WORKING_RESPONSE));
     // test null initiator version and worker version
     elements.add(generateElement(14,"db3", "tb4", null, CompactionType.MINOR, 
TxnStore.WORKING_RESPONSE,
-            System.currentTimeMillis(), false, null, null));
+            System.currentTimeMillis(), false, null, null,20));
     elements.add(generateElement(15,"db3", "tb5", null, CompactionType.MINOR, 
TxnStore.WORKING_RESPONSE,
-            System.currentTimeMillis(),true, "4.0.0", "4.0.0"));
+            System.currentTimeMillis(),true, "4.0.0", "4.0.0", 30));
     elements.add(generateElement(16,"db3", "tb6", null, CompactionType.MINOR, 
TxnStore.WORKING_RESPONSE));
     elements.add(generateElement(17,"db3", "tb7", null, CompactionType.MINOR, 
TxnStore.WORKING_RESPONSE,
-            System.currentTimeMillis(),true, "4.0.0", "4.0.0"));
+            System.currentTimeMillis(),true, "4.0.0", "4.0.0",40));
 
     scr.setCompacts(elements);
     AcidMetricService.updateMetricsFromShowCompact(scr, conf);
@@ -483,44 +484,91 @@ public class TestCompactionMetrics  extends CompactorTest 
{
 
   @Test
   public void testAgeMetricsNotSet() {
-    ShowCompactResponse scr = new ShowCompactResponse();
-    List<ShowCompactResponseElement> elements = new ArrayList<>();
-    elements.add(generateElement(1, "db", "tb", null, CompactionType.MAJOR, 
TxnStore.FAILED_RESPONSE, 1L));
-    elements.add(generateElement(5, "db", "tb3", "p1", CompactionType.MINOR, 
TxnStore.DID_NOT_INITIATE_RESPONSE, 2L));
-    elements.add(generateElement(9, "db2", "tb", null, CompactionType.MINOR, 
TxnStore.SUCCEEDED_RESPONSE, 3L));
-    elements.add(generateElement(13, "db3", "tb3", null, CompactionType.MINOR, 
TxnStore.WORKING_RESPONSE, 4L));
-    elements.add(generateElement(14, "db3", "tb4", null, CompactionType.MINOR, 
TxnStore.CLEANING_RESPONSE, 5L));
+    List<ShowCompactResponseElement> elements = ImmutableList.of(
+        generateElement(1, "db", "tb", null, CompactionType.MAJOR, 
TxnStore.FAILED_RESPONSE, 1L),
+        generateElement(5, "db", "tb3", "p1", CompactionType.MINOR, 
TxnStore.DID_NOT_INITIATE_RESPONSE, 2L),
+        generateElement(9, "db2", "tb", null, CompactionType.MINOR, 
TxnStore.SUCCEEDED_RESPONSE, 3L)
+    );
 
+    ShowCompactResponse scr = new ShowCompactResponse();
     scr.setCompacts(elements);
     AcidMetricService.updateMetricsFromShowCompact(scr, conf);
+
     // Check that it is not set
     Assert.assertEquals(0, 
Metrics.getOrCreateGauge(MetricsConstants.COMPACTION_OLDEST_ENQUEUE_AGE).intValue());
+    Assert.assertEquals(0, 
Metrics.getOrCreateGauge(MetricsConstants.COMPACTION_OLDEST_WORKING_AGE).intValue());
+    Assert.assertEquals(0, 
Metrics.getOrCreateGauge(MetricsConstants.COMPACTION_OLDEST_CLEANING_AGE).intValue());
   }
 
   @Test
-  public void testAgeMetricsAge() {
+  public void testInitiatedAgeMetrics() {
     ShowCompactResponse scr = new ShowCompactResponse();
-    List<ShowCompactResponseElement> elements = new ArrayList<>();
     long start = System.currentTimeMillis() - 1000L;
-    elements.add(generateElement(15,"db3", "tb5", null, CompactionType.MINOR, 
TxnStore.INITIATED_RESPONSE, start));
+    List<ShowCompactResponseElement> elements = ImmutableList.of(
+        generateElement(15, "db3", "tb5", null, CompactionType.MINOR, 
TxnStore.INITIATED_RESPONSE, start)
+    );
 
     scr.setCompacts(elements);
     AcidMetricService.updateMetricsFromShowCompact(scr, conf);
-    long diff = (System.currentTimeMillis() - start)/1000;
+    long diff = (System.currentTimeMillis() - start) / 1000;
+
     // Check that we have at least 1s old compaction age, but not more than 
expected
-    
Assert.assertTrue(Metrics.getOrCreateGauge(MetricsConstants.COMPACTION_OLDEST_ENQUEUE_AGE).intValue()
 <= diff);
-    
Assert.assertTrue(Metrics.getOrCreateGauge(MetricsConstants.COMPACTION_OLDEST_ENQUEUE_AGE).intValue()
 >= 1);
+    int age = 
Metrics.getOrCreateGauge(MetricsConstants.COMPACTION_OLDEST_ENQUEUE_AGE).intValue();
+    Assert.assertTrue(age <= diff);
+    Assert.assertTrue(age >= 1);
   }
 
   @Test
-  public void testAgeMetricsOrder() {
+  public void testWorkingAgeMetrics() {
+    ShowCompactResponse scr = new ShowCompactResponse();
+
+    long start = System.currentTimeMillis() - 1000L;
+    List<ShowCompactResponseElement> elements = ImmutableList.of(
+        generateElement(17, "db3", "tb7", null, CompactionType.MINOR, 
TxnStore.WORKING_RESPONSE,
+            System.currentTimeMillis(), true, "4.0.0", "4.0.0", start)
+    );
+
+    scr.setCompacts(elements);
+    AcidMetricService.updateMetricsFromShowCompact(scr, conf);
+    long diff = (System.currentTimeMillis() - start) / 1000;
+
+    // Check that we have at least 1s old compaction age, but not more than 
expected
+    int age = 
Metrics.getOrCreateGauge(MetricsConstants.COMPACTION_OLDEST_WORKING_AGE).intValue();
+    Assert.assertTrue(age <= diff);
+    Assert.assertTrue(age >= 1);
+  }
+
+  @Test
+  public void testCleaningAgeMetrics() {
+    ShowCompactResponse scr = new ShowCompactResponse();
+
+    long start = System.currentTimeMillis() - 1000L;
+    List<ShowCompactResponseElement> elements = ImmutableList.of(
+        generateElement(19, "db3", "tb7", null, CompactionType.MINOR, 
TxnStore.CLEANING_RESPONSE,
+            System.currentTimeMillis(), true, "4.0.0", "4.0.0", -1L, start)
+    );
+
+    scr.setCompacts(elements);
+    AcidMetricService.updateMetricsFromShowCompact(scr, conf);
+    long diff = (System.currentTimeMillis() - start) / 1000;
+
+    // Check that we have at least 1s old compaction age, but not more than 
expected
+    int age = 
Metrics.getOrCreateGauge(MetricsConstants.COMPACTION_OLDEST_CLEANING_AGE).intValue();
+    Assert.assertTrue(age <= diff);
+    Assert.assertTrue(age >= 1);
+  }
+
+  @Test
+  public void testInitiatedAgeMetricsOrder() {
     ShowCompactResponse scr = new ShowCompactResponse();
     long start = System.currentTimeMillis();
-    List<ShowCompactResponseElement> elements = new ArrayList<>();
-    elements.add(generateElement(15,"db3", "tb5", null, CompactionType.MINOR, 
TxnStore.INITIATED_RESPONSE,
-        start - 1000L));
-    elements.add(generateElement(16,"db3", "tb6", null, CompactionType.MINOR, 
TxnStore.INITIATED_RESPONSE,
-        start - 100000L));
+
+    List<ShowCompactResponseElement> elements = ImmutableList.of(
+        generateElement(15, "db3", "tb5", null, CompactionType.MINOR, 
TxnStore.INITIATED_RESPONSE,
+            start - 1_000L),
+        generateElement(16, "db3", "tb6", null, CompactionType.MINOR, 
TxnStore.INITIATED_RESPONSE,
+            start - 15_000L)
+    );
 
     scr.setCompacts(elements);
     AcidMetricService.updateMetricsFromShowCompact(scr, conf);
@@ -528,14 +576,79 @@ public class TestCompactionMetrics  extends CompactorTest 
{
     
Assert.assertTrue(Metrics.getOrCreateGauge(MetricsConstants.COMPACTION_OLDEST_ENQUEUE_AGE).intValue()
 > 10);
 
     // Check the reverse order
-    elements = new ArrayList<>();
-    elements.add(generateElement(16,"db3", "tb6", null, CompactionType.MINOR, 
TxnStore.INITIATED_RESPONSE,
-        start - 100000L));
-    elements.add(generateElement(15,"db3", "tb5", null, CompactionType.MINOR, 
TxnStore.INITIATED_RESPONSE,
-        start - 1000L));
+    elements = ImmutableList.of(
+        generateElement(16, "db3", "tb6", null, CompactionType.MINOR, 
TxnStore.INITIATED_RESPONSE,
+            start - 25_000L),
+        generateElement(15, "db3", "tb5", null, CompactionType.MINOR, 
TxnStore.INITIATED_RESPONSE,
+            start - 1_000L)
+    );
+    scr.setCompacts(elements);
+    AcidMetricService.updateMetricsFromShowCompact(scr, conf);
+
+    // Check that the age is older than 20s
+    
Assert.assertTrue(Metrics.getOrCreateGauge(MetricsConstants.COMPACTION_OLDEST_ENQUEUE_AGE).intValue()
 > 20);
+  }
+
+  @Test
+  public void testWorkingAgeMetricsOrder() {
+    ShowCompactResponse scr = new ShowCompactResponse();
+    long start = System.currentTimeMillis();
+
+    List<ShowCompactResponseElement> elements = ImmutableList.of(
+        generateElement(15, "db3", "tb5", null, CompactionType.MINOR, 
TxnStore.WORKING_RESPONSE,
+            start, false, "4.0.0", "4.0.0", start - 1_000L),
+        generateElement(16, "db3", "tb6", null, CompactionType.MINOR, 
TxnStore.WORKING_RESPONSE,
+            start, false, "4.0.0", "4.0.0", start - 15_000L)
+    );
+
+    scr.setCompacts(elements);
+    AcidMetricService.updateMetricsFromShowCompact(scr, conf);
+    // Check that the age is older than 10s
+    
Assert.assertTrue(Metrics.getOrCreateGauge(MetricsConstants.COMPACTION_OLDEST_WORKING_AGE).intValue()
 > 10);
+
+    // Check the reverse order
+    elements = ImmutableList.of(
+        generateElement(16, "db3", "tb6", null, CompactionType.MINOR, 
TxnStore.WORKING_RESPONSE,
+            start, false, "4.0.0", "4.0.0", start - 25_000L),
+        generateElement(15, "db3", "tb5", null, CompactionType.MINOR, 
TxnStore.WORKING_RESPONSE,
+            start, false, "4.0.0", "4.0.0", start - 1_000L)
+    );
+    scr.setCompacts(elements);
+    AcidMetricService.updateMetricsFromShowCompact(scr, conf);
+
+    // Check that the age is older than 20s
+    
Assert.assertTrue(Metrics.getOrCreateGauge(MetricsConstants.COMPACTION_OLDEST_WORKING_AGE).intValue()
 > 20);
+  }
 
+  @Test
+  public void testCleaningAgeMetricsOrder() {
+    ShowCompactResponse scr = new ShowCompactResponse();
+    long start = System.currentTimeMillis();
+
+    List<ShowCompactResponseElement> elements = ImmutableList.of(
+        generateElement(15, "db3", "tb5", null, CompactionType.MINOR, 
TxnStore.CLEANING_RESPONSE,
+            start, false, "4.0.0", "4.0.0", -1L, start - 1_000L),
+        generateElement(16, "db3", "tb6", null, CompactionType.MINOR, 
TxnStore.CLEANING_RESPONSE,
+            start, false, "4.0.0", "4.0.0", -1L, start - 15_000L)
+    );
+
+    scr.setCompacts(elements);
+    AcidMetricService.updateMetricsFromShowCompact(scr, conf);
     // Check that the age is older than 10s
-    
Assert.assertTrue(Metrics.getOrCreateGauge(MetricsConstants.COMPACTION_OLDEST_ENQUEUE_AGE).intValue()
 > 10);
+    
Assert.assertTrue(Metrics.getOrCreateGauge(MetricsConstants.COMPACTION_OLDEST_CLEANING_AGE).intValue()
 > 10);
+
+    // Check the reverse order
+    elements = ImmutableList.of(
+        generateElement(16, "db3", "tb6", null, CompactionType.MINOR, 
TxnStore.CLEANING_RESPONSE,
+            start, false, "4.0.0", "4.0.0", -1L, start - 25_000L),
+        generateElement(15, "db3", "tb5", null, CompactionType.MINOR, 
TxnStore.CLEANING_RESPONSE,
+            start, false, "4.0.0", "4.0.0", -1L, start - 1_000L)
+    );
+    scr.setCompacts(elements);
+    AcidMetricService.updateMetricsFromShowCompact(scr, conf);
+
+    // Check that the age is older than 20s
+    
Assert.assertTrue(Metrics.getOrCreateGauge(MetricsConstants.COMPACTION_OLDEST_CLEANING_AGE).intValue()
 > 20);
   }
 
   @Test
@@ -741,12 +854,19 @@ public class TestCompactionMetrics  extends CompactorTest 
{
   private ShowCompactResponseElement generateElement(long id, String db, 
String table, String partition,
       CompactionType type, String state, long enqueueTime, boolean 
manuallyInitiatedCompaction) {
     return generateElement(id, db, table, partition, type, state, enqueueTime, 
manuallyInitiatedCompaction,
-            null, null);
+            null, null, -1);
+  }
+
+  private ShowCompactResponseElement generateElement(long id, String db, 
String table, String partition,
+          CompactionType type, String state, long enqueueTime, boolean 
manuallyInitiatedCompaction,
+          String initiatorVersion, String workerVersion, long startTime) {
+    return generateElement(id, db, table, partition, type, state, enqueueTime, 
manuallyInitiatedCompaction,
+        initiatorVersion, workerVersion, startTime, -1L);
   }
 
   private ShowCompactResponseElement generateElement(long id, String db, 
String table, String partition,
           CompactionType type, String state, long enqueueTime, boolean 
manuallyInitiatedCompaction,
-          String initiatorVersion, String workerVersion) {
+          String initiatorVersion, String workerVersion, long startTime, long 
cleanerStartTime) {
     ShowCompactResponseElement element = new ShowCompactResponseElement(db, 
table, type, state);
     element.setId(id);
     element.setPartitionname(partition);
@@ -764,6 +884,8 @@ public class TestCompactionMetrics  extends CompactorTest {
     element.setWorkerid(workerId);
     element.setInitiatorVersion(initiatorVersion);
     element.setWorkerVersion(workerVersion);
+    element.setStart(startTime);
+    element.setCleanerStart(cleanerStartTime);
     return element;
   }
 
diff --git a/ql/src/test/results/clientpositive/llap/sysdb.q.out 
b/ql/src/test/results/clientpositive/llap/sysdb.q.out
index da1cf94..80b6b49 100644
--- a/ql/src/test/results/clientpositive/llap/sysdb.q.out
+++ b/ql/src/test/results/clientpositive/llap/sysdb.q.out
@@ -465,6 +465,7 @@ columns_v2  column_name
 columns_v2     comment
 columns_v2     integer_idx
 columns_v2     type_name
+compaction_queue       cq_cleaner_start
 compaction_queue       cq_database
 compaction_queue       cq_enqueue_time
 compaction_queue       cq_error_message
@@ -484,6 +485,8 @@ compaction_queue    cq_worker_id
 compaction_queue       cq_worker_version
 compactions    c_catalog
 compactions    c_catalog
+compactions    c_cleaner_start
+compactions    c_cleaner_start
 compactions    c_database
 compactions    c_database
 compactions    c_duration
@@ -1552,8 +1555,8 @@ POSTHOOK: Input: sys@compaction_queue
 POSTHOOK: Input: sys@compactions
 POSTHOOK: Input: sys@completed_compactions
 #### A masked pattern was here ####
-1      default default scr_txn NULL    major   initiated       NULL    NULL    
NULL    #Masked#        NULL    NULL    NULL    NULL    NULL    #Masked#        
manual  4.0.0-SNAPSHOT
-2      default default scr_txn_2       NULL    minor   initiated       NULL    
NULL    NULL    #Masked#        NULL    NULL    NULL    NULL    NULL    
#Masked#        manual  4.0.0-SNAPSHOT
+1      default default scr_txn NULL    major   initiated       NULL    NULL    
NULL    #Masked#        NULL    NULL    NULL    NULL    NULL    #Masked#        
manual  4.0.0-SNAPSHOT  NULL
+2      default default scr_txn_2       NULL    minor   initiated       NULL    
NULL    NULL    #Masked#        NULL    NULL    NULL    NULL    NULL    
#Masked#        manual  4.0.0-SNAPSHOT  NULL
 PREHOOK: query: use INFORMATION_SCHEMA
 PREHOOK: type: SWITCHDATABASE
 PREHOOK: Input: database:information_schema
@@ -1781,5 +1784,5 @@ POSTHOOK: Input: sys@dbs
 POSTHOOK: Input: sys@tbl_privs
 POSTHOOK: Input: sys@tbls
 #### A masked pattern was here ####
-1      default default scr_txn NULL    major   initiated       NULL    NULL    
NULL    #Masked#        NULL    NULL    NULL    NULL    NULL    #Masked#        
manual  4.0.0-SNAPSHOT
-2      default default scr_txn_2       NULL    minor   initiated       NULL    
NULL    NULL    #Masked#        NULL    NULL    NULL    NULL    NULL    
#Masked#        manual  4.0.0-SNAPSHOT
+1      default default scr_txn NULL    major   initiated       NULL    NULL    
NULL    #Masked#        NULL    NULL    NULL    NULL    NULL    #Masked#        
manual  4.0.0-SNAPSHOT  NULL
+2      default default scr_txn_2       NULL    minor   initiated       NULL    
NULL    NULL    #Masked#        NULL    NULL    NULL    NULL    NULL    
#Masked#        manual  4.0.0-SNAPSHOT  NULL
diff --git 
a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.cpp
 
b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.cpp
index 0b50045..71e30ab 100644
--- 
a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.cpp
+++ 
b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.cpp
@@ -27547,6 +27547,11 @@ void 
ShowCompactResponseElement::__set_initiatorVersion(const std::string& val)
   this->initiatorVersion = val;
 __isset.initiatorVersion = true;
 }
+
+void ShowCompactResponseElement::__set_cleanerStart(const int64_t val) {
+  this->cleanerStart = val;
+__isset.cleanerStart = true;
+}
 std::ostream& operator<<(std::ostream& out, const ShowCompactResponseElement& 
obj)
 {
   obj.printTo(out);
@@ -27725,6 +27730,14 @@ uint32_t 
ShowCompactResponseElement::read(::apache::thrift::protocol::TProtocol*
           xfer += iprot->skip(ftype);
         }
         break;
+      case 19:
+        if (ftype == ::apache::thrift::protocol::T_I64) {
+          xfer += iprot->readI64(this->cleanerStart);
+          this->__isset.cleanerStart = true;
+        } else {
+          xfer += iprot->skip(ftype);
+        }
+        break;
       default:
         xfer += iprot->skip(ftype);
         break;
@@ -27836,6 +27849,11 @@ uint32_t 
ShowCompactResponseElement::write(::apache::thrift::protocol::TProtocol
     xfer += oprot->writeString(this->initiatorVersion);
     xfer += oprot->writeFieldEnd();
   }
+  if (this->__isset.cleanerStart) {
+    xfer += oprot->writeFieldBegin("cleanerStart", 
::apache::thrift::protocol::T_I64, 19);
+    xfer += oprot->writeI64(this->cleanerStart);
+    xfer += oprot->writeFieldEnd();
+  }
   xfer += oprot->writeFieldStop();
   xfer += oprot->writeStructEnd();
   return xfer;
@@ -27861,6 +27879,7 @@ void swap(ShowCompactResponseElement &a, 
ShowCompactResponseElement &b) {
   swap(a.workerVersion, b.workerVersion);
   swap(a.initiatorId, b.initiatorId);
   swap(a.initiatorVersion, b.initiatorVersion);
+  swap(a.cleanerStart, b.cleanerStart);
   swap(a.__isset, b.__isset);
 }
 
@@ -27883,6 +27902,7 @@ 
ShowCompactResponseElement::ShowCompactResponseElement(const ShowCompactResponse
   workerVersion = other984.workerVersion;
   initiatorId = other984.initiatorId;
   initiatorVersion = other984.initiatorVersion;
+  cleanerStart = other984.cleanerStart;
   __isset = other984.__isset;
 }
 ShowCompactResponseElement& ShowCompactResponseElement::operator=(const 
ShowCompactResponseElement& other985) {
@@ -27904,6 +27924,7 @@ ShowCompactResponseElement& 
ShowCompactResponseElement::operator=(const ShowComp
   workerVersion = other985.workerVersion;
   initiatorId = other985.initiatorId;
   initiatorVersion = other985.initiatorVersion;
+  cleanerStart = other985.cleanerStart;
   __isset = other985.__isset;
   return *this;
 }
@@ -27928,6 +27949,7 @@ void ShowCompactResponseElement::printTo(std::ostream& 
out) const {
   out << ", " << "workerVersion="; (__isset.workerVersion ? (out << 
to_string(workerVersion)) : (out << "<null>"));
   out << ", " << "initiatorId="; (__isset.initiatorId ? (out << 
to_string(initiatorId)) : (out << "<null>"));
   out << ", " << "initiatorVersion="; (__isset.initiatorVersion ? (out << 
to_string(initiatorVersion)) : (out << "<null>"));
+  out << ", " << "cleanerStart="; (__isset.cleanerStart ? (out << 
to_string(cleanerStart)) : (out << "<null>"));
   out << ")";
 }
 
diff --git 
a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.h
 
b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.h
index 3e5eb35..1d4d544 100644
--- 
a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.h
+++ 
b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.h
@@ -10407,7 +10407,7 @@ void swap(ShowCompactRequest &a, ShowCompactRequest &b);
 std::ostream& operator<<(std::ostream& out, const ShowCompactRequest& obj);
 
 typedef struct _ShowCompactResponseElement__isset {
-  _ShowCompactResponseElement__isset() : partitionname(false), 
workerid(false), start(false), runAs(false), hightestTxnId(false), 
metaInfo(false), endTime(false), hadoopJobId(true), id(false), 
errorMessage(false), enqueueTime(false), workerVersion(false), 
initiatorId(false), initiatorVersion(false) {}
+  _ShowCompactResponseElement__isset() : partitionname(false), 
workerid(false), start(false), runAs(false), hightestTxnId(false), 
metaInfo(false), endTime(false), hadoopJobId(true), id(false), 
errorMessage(false), enqueueTime(false), workerVersion(false), 
initiatorId(false), initiatorVersion(false), cleanerStart(false) {}
   bool partitionname :1;
   bool workerid :1;
   bool start :1;
@@ -10422,6 +10422,7 @@ typedef struct _ShowCompactResponseElement__isset {
   bool workerVersion :1;
   bool initiatorId :1;
   bool initiatorVersion :1;
+  bool cleanerStart :1;
 } _ShowCompactResponseElement__isset;
 
 class ShowCompactResponseElement : public virtual ::apache::thrift::TBase {
@@ -10429,7 +10430,7 @@ class ShowCompactResponseElement : public virtual 
::apache::thrift::TBase {
 
   ShowCompactResponseElement(const ShowCompactResponseElement&);
   ShowCompactResponseElement& operator=(const ShowCompactResponseElement&);
-  ShowCompactResponseElement() : dbname(), tablename(), partitionname(), 
type((CompactionType::type)0), state(), workerid(), start(0), runAs(), 
hightestTxnId(0), metaInfo(), endTime(0), hadoopJobId("None"), id(0), 
errorMessage(), enqueueTime(0), workerVersion(), initiatorId(), 
initiatorVersion() {
+  ShowCompactResponseElement() : dbname(), tablename(), partitionname(), 
type((CompactionType::type)0), state(), workerid(), start(0), runAs(), 
hightestTxnId(0), metaInfo(), endTime(0), hadoopJobId("None"), id(0), 
errorMessage(), enqueueTime(0), workerVersion(), initiatorId(), 
initiatorVersion(), cleanerStart(0) {
   }
 
   virtual ~ShowCompactResponseElement() noexcept;
@@ -10455,6 +10456,7 @@ class ShowCompactResponseElement : public virtual 
::apache::thrift::TBase {
   std::string workerVersion;
   std::string initiatorId;
   std::string initiatorVersion;
+  int64_t cleanerStart;
 
   _ShowCompactResponseElement__isset __isset;
 
@@ -10494,6 +10496,8 @@ class ShowCompactResponseElement : public virtual 
::apache::thrift::TBase {
 
   void __set_initiatorVersion(const std::string& val);
 
+  void __set_cleanerStart(const int64_t val);
+
   bool operator == (const ShowCompactResponseElement & rhs) const
   {
     if (!(dbname == rhs.dbname))
@@ -10560,6 +10564,10 @@ class ShowCompactResponseElement : public virtual 
::apache::thrift::TBase {
       return false;
     else if (__isset.initiatorVersion && !(initiatorVersion == 
rhs.initiatorVersion))
       return false;
+    if (__isset.cleanerStart != rhs.__isset.cleanerStart)
+      return false;
+    else if (__isset.cleanerStart && !(cleanerStart == rhs.cleanerStart))
+      return false;
     return true;
   }
   bool operator != (const ShowCompactResponseElement &rhs) const {
diff --git 
a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/ShowCompactResponseElement.java
 
b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/ShowCompactResponseElement.java
index dff37fe..d0245ea 100644
--- 
a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/ShowCompactResponseElement.java
+++ 
b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/ShowCompactResponseElement.java
@@ -29,6 +29,7 @@ package org.apache.hadoop.hive.metastore.api;
   private static final org.apache.thrift.protocol.TField 
WORKER_VERSION_FIELD_DESC = new 
org.apache.thrift.protocol.TField("workerVersion", 
org.apache.thrift.protocol.TType.STRING, (short)16);
   private static final org.apache.thrift.protocol.TField 
INITIATOR_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("initiatorId", 
org.apache.thrift.protocol.TType.STRING, (short)17);
   private static final org.apache.thrift.protocol.TField 
INITIATOR_VERSION_FIELD_DESC = new 
org.apache.thrift.protocol.TField("initiatorVersion", 
org.apache.thrift.protocol.TType.STRING, (short)18);
+  private static final org.apache.thrift.protocol.TField 
CLEANER_START_FIELD_DESC = new 
org.apache.thrift.protocol.TField("cleanerStart", 
org.apache.thrift.protocol.TType.I64, (short)19);
 
   private static final org.apache.thrift.scheme.SchemeFactory 
STANDARD_SCHEME_FACTORY = new ShowCompactResponseElementStandardSchemeFactory();
   private static final org.apache.thrift.scheme.SchemeFactory 
TUPLE_SCHEME_FACTORY = new ShowCompactResponseElementTupleSchemeFactory();
@@ -51,6 +52,7 @@ package org.apache.hadoop.hive.metastore.api;
   private @org.apache.thrift.annotation.Nullable java.lang.String 
workerVersion; // optional
   private @org.apache.thrift.annotation.Nullable java.lang.String initiatorId; 
// optional
   private @org.apache.thrift.annotation.Nullable java.lang.String 
initiatorVersion; // optional
+  private long cleanerStart; // optional
 
   /** The set of fields this struct contains, along with convenience methods 
for finding and manipulating them. */
   public enum _Fields implements org.apache.thrift.TFieldIdEnum {
@@ -75,7 +77,8 @@ package org.apache.hadoop.hive.metastore.api;
     ENQUEUE_TIME((short)15, "enqueueTime"),
     WORKER_VERSION((short)16, "workerVersion"),
     INITIATOR_ID((short)17, "initiatorId"),
-    INITIATOR_VERSION((short)18, "initiatorVersion");
+    INITIATOR_VERSION((short)18, "initiatorVersion"),
+    CLEANER_START((short)19, "cleanerStart");
 
     private static final java.util.Map<java.lang.String, _Fields> byName = new 
java.util.HashMap<java.lang.String, _Fields>();
 
@@ -127,6 +130,8 @@ package org.apache.hadoop.hive.metastore.api;
           return INITIATOR_ID;
         case 18: // INITIATOR_VERSION
           return INITIATOR_VERSION;
+        case 19: // CLEANER_START
+          return CLEANER_START;
         default:
           return null;
       }
@@ -173,8 +178,9 @@ package org.apache.hadoop.hive.metastore.api;
   private static final int __ENDTIME_ISSET_ID = 2;
   private static final int __ID_ISSET_ID = 3;
   private static final int __ENQUEUETIME_ISSET_ID = 4;
+  private static final int __CLEANERSTART_ISSET_ID = 5;
   private byte __isset_bitfield = 0;
-  private static final _Fields optionals[] = 
{_Fields.PARTITIONNAME,_Fields.WORKERID,_Fields.START,_Fields.RUN_AS,_Fields.HIGHTEST_TXN_ID,_Fields.META_INFO,_Fields.END_TIME,_Fields.HADOOP_JOB_ID,_Fields.ID,_Fields.ERROR_MESSAGE,_Fields.ENQUEUE_TIME,_Fields.WORKER_VERSION,_Fields.INITIATOR_ID,_Fields.INITIATOR_VERSION};
+  private static final _Fields optionals[] = 
{_Fields.PARTITIONNAME,_Fields.WORKERID,_Fields.START,_Fields.RUN_AS,_Fields.HIGHTEST_TXN_ID,_Fields.META_INFO,_Fields.END_TIME,_Fields.HADOOP_JOB_ID,_Fields.ID,_Fields.ERROR_MESSAGE,_Fields.ENQUEUE_TIME,_Fields.WORKER_VERSION,_Fields.INITIATOR_ID,_Fields.INITIATOR_VERSION,_Fields.CLEANER_START};
   public static final java.util.Map<_Fields, 
org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
   static {
     java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = 
new java.util.EnumMap<_Fields, 
org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
@@ -214,6 +220,8 @@ package org.apache.hadoop.hive.metastore.api;
         new 
org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
     tmpMap.put(_Fields.INITIATOR_VERSION, new 
org.apache.thrift.meta_data.FieldMetaData("initiatorVersion", 
org.apache.thrift.TFieldRequirementType.OPTIONAL, 
         new 
org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+    tmpMap.put(_Fields.CLEANER_START, new 
org.apache.thrift.meta_data.FieldMetaData("cleanerStart", 
org.apache.thrift.TFieldRequirementType.OPTIONAL, 
+        new 
org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64)));
     metaDataMap = java.util.Collections.unmodifiableMap(tmpMap);
     
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(ShowCompactResponseElement.class,
 metaDataMap);
   }
@@ -285,6 +293,7 @@ package org.apache.hadoop.hive.metastore.api;
     if (other.isSetInitiatorVersion()) {
       this.initiatorVersion = other.initiatorVersion;
     }
+    this.cleanerStart = other.cleanerStart;
   }
 
   public ShowCompactResponseElement deepCopy() {
@@ -317,6 +326,8 @@ package org.apache.hadoop.hive.metastore.api;
     this.workerVersion = null;
     this.initiatorId = null;
     this.initiatorVersion = null;
+    setCleanerStartIsSet(false);
+    this.cleanerStart = 0;
   }
 
   @org.apache.thrift.annotation.Nullable
@@ -749,6 +760,28 @@ package org.apache.hadoop.hive.metastore.api;
     }
   }
 
+  public long getCleanerStart() {
+    return this.cleanerStart;
+  }
+
+  public void setCleanerStart(long cleanerStart) {
+    this.cleanerStart = cleanerStart;
+    setCleanerStartIsSet(true);
+  }
+
+  public void unsetCleanerStart() {
+    __isset_bitfield = 
org.apache.thrift.EncodingUtils.clearBit(__isset_bitfield, 
__CLEANERSTART_ISSET_ID);
+  }
+
+  /** Returns true if field cleanerStart is set (has been assigned a value) 
and false otherwise */
+  public boolean isSetCleanerStart() {
+    return org.apache.thrift.EncodingUtils.testBit(__isset_bitfield, 
__CLEANERSTART_ISSET_ID);
+  }
+
+  public void setCleanerStartIsSet(boolean value) {
+    __isset_bitfield = 
org.apache.thrift.EncodingUtils.setBit(__isset_bitfield, 
__CLEANERSTART_ISSET_ID, value);
+  }
+
   public void setFieldValue(_Fields field, 
@org.apache.thrift.annotation.Nullable java.lang.Object value) {
     switch (field) {
     case DBNAME:
@@ -895,6 +928,14 @@ package org.apache.hadoop.hive.metastore.api;
       }
       break;
 
+    case CLEANER_START:
+      if (value == null) {
+        unsetCleanerStart();
+      } else {
+        setCleanerStart((java.lang.Long)value);
+      }
+      break;
+
     }
   }
 
@@ -955,6 +996,9 @@ package org.apache.hadoop.hive.metastore.api;
     case INITIATOR_VERSION:
       return getInitiatorVersion();
 
+    case CLEANER_START:
+      return getCleanerStart();
+
     }
     throw new java.lang.IllegalStateException();
   }
@@ -1002,6 +1046,8 @@ package org.apache.hadoop.hive.metastore.api;
       return isSetInitiatorId();
     case INITIATOR_VERSION:
       return isSetInitiatorVersion();
+    case CLEANER_START:
+      return isSetCleanerStart();
     }
     throw new java.lang.IllegalStateException();
   }
@@ -1181,6 +1227,15 @@ package org.apache.hadoop.hive.metastore.api;
         return false;
     }
 
+    boolean this_present_cleanerStart = true && this.isSetCleanerStart();
+    boolean that_present_cleanerStart = true && that.isSetCleanerStart();
+    if (this_present_cleanerStart || that_present_cleanerStart) {
+      if (!(this_present_cleanerStart && that_present_cleanerStart))
+        return false;
+      if (this.cleanerStart != that.cleanerStart)
+        return false;
+    }
+
     return true;
   }
 
@@ -1260,6 +1315,10 @@ package org.apache.hadoop.hive.metastore.api;
     if (isSetInitiatorVersion())
       hashCode = hashCode * 8191 + initiatorVersion.hashCode();
 
+    hashCode = hashCode * 8191 + ((isSetCleanerStart()) ? 131071 : 524287);
+    if (isSetCleanerStart())
+      hashCode = hashCode * 8191 + 
org.apache.thrift.TBaseHelper.hashCode(cleanerStart);
+
     return hashCode;
   }
 
@@ -1451,6 +1510,16 @@ package org.apache.hadoop.hive.metastore.api;
         return lastComparison;
       }
     }
+    lastComparison = java.lang.Boolean.compare(isSetCleanerStart(), 
other.isSetCleanerStart());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetCleanerStart()) {
+      lastComparison = 
org.apache.thrift.TBaseHelper.compareTo(this.cleanerStart, other.cleanerStart);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
     return 0;
   }
 
@@ -1623,6 +1692,12 @@ package org.apache.hadoop.hive.metastore.api;
       }
       first = false;
     }
+    if (isSetCleanerStart()) {
+      if (!first) sb.append(", ");
+      sb.append("cleanerStart:");
+      sb.append(this.cleanerStart);
+      first = false;
+    }
     sb.append(")");
     return sb.toString();
   }
@@ -1828,6 +1903,14 @@ package org.apache.hadoop.hive.metastore.api;
               org.apache.thrift.protocol.TProtocolUtil.skip(iprot, 
schemeField.type);
             }
             break;
+          case 19: // CLEANER_START
+            if (schemeField.type == org.apache.thrift.protocol.TType.I64) {
+              struct.cleanerStart = iprot.readI64();
+              struct.setCleanerStartIsSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, 
schemeField.type);
+            }
+            break;
           default:
             org.apache.thrift.protocol.TProtocolUtil.skip(iprot, 
schemeField.type);
         }
@@ -1949,6 +2032,11 @@ package org.apache.hadoop.hive.metastore.api;
           oprot.writeFieldEnd();
         }
       }
+      if (struct.isSetCleanerStart()) {
+        oprot.writeFieldBegin(CLEANER_START_FIELD_DESC);
+        oprot.writeI64(struct.cleanerStart);
+        oprot.writeFieldEnd();
+      }
       oprot.writeFieldStop();
       oprot.writeStructEnd();
     }
@@ -2013,7 +2101,10 @@ package org.apache.hadoop.hive.metastore.api;
       if (struct.isSetInitiatorVersion()) {
         optionals.set(13);
       }
-      oprot.writeBitSet(optionals, 14);
+      if (struct.isSetCleanerStart()) {
+        optionals.set(14);
+      }
+      oprot.writeBitSet(optionals, 15);
       if (struct.isSetPartitionname()) {
         oprot.writeString(struct.partitionname);
       }
@@ -2056,6 +2147,9 @@ package org.apache.hadoop.hive.metastore.api;
       if (struct.isSetInitiatorVersion()) {
         oprot.writeString(struct.initiatorVersion);
       }
+      if (struct.isSetCleanerStart()) {
+        oprot.writeI64(struct.cleanerStart);
+      }
     }
 
     @Override
@@ -2069,7 +2163,7 @@ package org.apache.hadoop.hive.metastore.api;
       struct.setTypeIsSet(true);
       struct.state = iprot.readString();
       struct.setStateIsSet(true);
-      java.util.BitSet incoming = iprot.readBitSet(14);
+      java.util.BitSet incoming = iprot.readBitSet(15);
       if (incoming.get(0)) {
         struct.partitionname = iprot.readString();
         struct.setPartitionnameIsSet(true);
@@ -2126,6 +2220,10 @@ package org.apache.hadoop.hive.metastore.api;
         struct.initiatorVersion = iprot.readString();
         struct.setInitiatorVersionIsSet(true);
       }
+      if (incoming.get(14)) {
+        struct.cleanerStart = iprot.readI64();
+        struct.setCleanerStartIsSet(true);
+      }
     }
   }
 
diff --git 
a/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/ShowCompactResponseElement.php
 
b/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/ShowCompactResponseElement.php
index a05ccf5..a66b43f 100644
--- 
a/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/ShowCompactResponseElement.php
+++ 
b/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/ShowCompactResponseElement.php
@@ -112,6 +112,11 @@ class ShowCompactResponseElement
             'isRequired' => false,
             'type' => TType::STRING,
         ),
+        19 => array(
+            'var' => 'cleanerStart',
+            'isRequired' => false,
+            'type' => TType::I64,
+        ),
     );
 
     /**
@@ -186,6 +191,10 @@ class ShowCompactResponseElement
      * @var string
      */
     public $initiatorVersion = null;
+    /**
+     * @var int
+     */
+    public $cleanerStart = null;
 
     public function __construct($vals = null)
     {
@@ -244,6 +253,9 @@ class ShowCompactResponseElement
             if (isset($vals['initiatorVersion'])) {
                 $this->initiatorVersion = $vals['initiatorVersion'];
             }
+            if (isset($vals['cleanerStart'])) {
+                $this->cleanerStart = $vals['cleanerStart'];
+            }
         }
     }
 
@@ -392,6 +404,13 @@ class ShowCompactResponseElement
                         $xfer += $input->skip($ftype);
                     }
                     break;
+                case 19:
+                    if ($ftype == TType::I64) {
+                        $xfer += $input->readI64($this->cleanerStart);
+                    } else {
+                        $xfer += $input->skip($ftype);
+                    }
+                    break;
                 default:
                     $xfer += $input->skip($ftype);
                     break;
@@ -496,6 +515,11 @@ class ShowCompactResponseElement
             $xfer += $output->writeString($this->initiatorVersion);
             $xfer += $output->writeFieldEnd();
         }
+        if ($this->cleanerStart !== null) {
+            $xfer += $output->writeFieldBegin('cleanerStart', TType::I64, 19);
+            $xfer += $output->writeI64($this->cleanerStart);
+            $xfer += $output->writeFieldEnd();
+        }
         $xfer += $output->writeFieldStop();
         $xfer += $output->writeStructEnd();
         return $xfer;
diff --git 
a/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py
 
b/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py
index d4a0917..f80294b 100644
--- 
a/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py
+++ 
b/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py
@@ -15751,11 +15751,12 @@ class ShowCompactResponseElement(object):
      - workerVersion
      - initiatorId
      - initiatorVersion
+     - cleanerStart
 
     """
 
 
-    def __init__(self, dbname=None, tablename=None, partitionname=None, 
type=None, state=None, workerid=None, start=None, runAs=None, 
hightestTxnId=None, metaInfo=None, endTime=None, hadoopJobId="None", id=None, 
errorMessage=None, enqueueTime=None, workerVersion=None, initiatorId=None, 
initiatorVersion=None,):
+    def __init__(self, dbname=None, tablename=None, partitionname=None, 
type=None, state=None, workerid=None, start=None, runAs=None, 
hightestTxnId=None, metaInfo=None, endTime=None, hadoopJobId="None", id=None, 
errorMessage=None, enqueueTime=None, workerVersion=None, initiatorId=None, 
initiatorVersion=None, cleanerStart=None,):
         self.dbname = dbname
         self.tablename = tablename
         self.partitionname = partitionname
@@ -15774,6 +15775,7 @@ class ShowCompactResponseElement(object):
         self.workerVersion = workerVersion
         self.initiatorId = initiatorId
         self.initiatorVersion = initiatorVersion
+        self.cleanerStart = cleanerStart
 
     def read(self, iprot):
         if iprot._fast_decode is not None and isinstance(iprot.trans, 
TTransport.CReadableTransport) and self.thrift_spec is not None:
@@ -15874,6 +15876,11 @@ class ShowCompactResponseElement(object):
                     self.initiatorVersion = iprot.readString().decode('utf-8', 
errors='replace') if sys.version_info[0] == 2 else iprot.readString()
                 else:
                     iprot.skip(ftype)
+            elif fid == 19:
+                if ftype == TType.I64:
+                    self.cleanerStart = iprot.readI64()
+                else:
+                    iprot.skip(ftype)
             else:
                 iprot.skip(ftype)
             iprot.readFieldEnd()
@@ -15956,6 +15963,10 @@ class ShowCompactResponseElement(object):
             oprot.writeFieldBegin('initiatorVersion', TType.STRING, 18)
             oprot.writeString(self.initiatorVersion.encode('utf-8') if 
sys.version_info[0] == 2 else self.initiatorVersion)
             oprot.writeFieldEnd()
+        if self.cleanerStart is not None:
+            oprot.writeFieldBegin('cleanerStart', TType.I64, 19)
+            oprot.writeI64(self.cleanerStart)
+            oprot.writeFieldEnd()
         oprot.writeFieldStop()
         oprot.writeStructEnd()
 
@@ -30425,6 +30436,7 @@ ShowCompactResponseElement.thrift_spec = (
     (16, TType.STRING, 'workerVersion', 'UTF8', None, ),  # 16
     (17, TType.STRING, 'initiatorId', 'UTF8', None, ),  # 17
     (18, TType.STRING, 'initiatorVersion', 'UTF8', None, ),  # 18
+    (19, TType.I64, 'cleanerStart', None, None, ),  # 19
 )
 all_structs.append(ShowCompactResponse)
 ShowCompactResponse.thrift_spec = (
diff --git 
a/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb
 
b/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb
index f08e16f..779898b 100644
--- 
a/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb
+++ 
b/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb
@@ -4578,6 +4578,7 @@ class ShowCompactResponseElement
   WORKERVERSION = 16
   INITIATORID = 17
   INITIATORVERSION = 18
+  CLEANERSTART = 19
 
   FIELDS = {
     DBNAME => {:type => ::Thrift::Types::STRING, :name => 'dbname'},
@@ -4597,7 +4598,8 @@ class ShowCompactResponseElement
     ENQUEUETIME => {:type => ::Thrift::Types::I64, :name => 'enqueueTime', 
:optional => true},
     WORKERVERSION => {:type => ::Thrift::Types::STRING, :name => 
'workerVersion', :optional => true},
     INITIATORID => {:type => ::Thrift::Types::STRING, :name => 'initiatorId', 
:optional => true},
-    INITIATORVERSION => {:type => ::Thrift::Types::STRING, :name => 
'initiatorVersion', :optional => true}
+    INITIATORVERSION => {:type => ::Thrift::Types::STRING, :name => 
'initiatorVersion', :optional => true},
+    CLEANERSTART => {:type => ::Thrift::Types::I64, :name => 'cleanerStart', 
:optional => true}
   }
 
   def struct_fields; FIELDS; end
diff --git 
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
 
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
index 448ea6a..2feff8c 100644
--- 
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
+++ 
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
@@ -494,6 +494,18 @@ public class MetastoreConf {
         12, TimeUnit.HOURS,
         "Age of oldest initiated compaction in the compaction queue after 
which an error will be logged. " +
             "Default time unit is: hours"),
+    COMPACTOR_LONG_RUNNING_INITIATOR_THRESHOLD_WARNING(
+        "metastore.compactor.long.running.initiator.threshold.warning",
+        "hive.compactor.long.running.initiator.threshold.warning",
+        6, TimeUnit.HOURS,
+        "Initiator cycle duration after which a warning will be logged. " +
+            "Default time unit is: hours"),
+    COMPACTOR_LONG_RUNNING_INITIATOR_THRESHOLD_ERROR(
+        "metastore.compactor.long.running.initiator.threshold.error",
+        "hive.compactor.long.running.initiator.threshold.error",
+        12, TimeUnit.HOURS,
+        "Initiator cycle duration after which an error will be logged. " +
+            "Default time unit is: hours"),
     COMPACTOR_COMPLETED_TXN_COMPONENTS_RECORD_THRESHOLD_WARNING(
         
"metastore.compactor.completed.txn.components.record.threshold.warning",
         "hive.compactor.completed.txn.components.record.threshold.warning",
diff --git 
a/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift 
b/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
index c33491e..7241cc6 100644
--- 
a/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
+++ 
b/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
@@ -1327,7 +1327,8 @@ struct ShowCompactResponseElement {
     15: optional i64 enqueueTime,
     16: optional string workerVersion,
     17: optional string initiatorId,
-    18: optional string initiatorVersion
+    18: optional string initiatorVersion,
+    19: optional i64 cleanerStart
 }
 
 struct ShowCompactResponse {
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/metrics/AcidMetricService.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/metrics/AcidMetricService.java
index 7870c51..ec8208c 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/metrics/AcidMetricService.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/metrics/AcidMetricService.java
@@ -45,7 +45,9 @@ import static 
org.apache.hadoop.hive.metastore.metrics.MetricsConstants.COMPACTI
 import static 
org.apache.hadoop.hive.metastore.metrics.MetricsConstants.COMPACTION_NUM_INITIATOR_VERSIONS;
 import static 
org.apache.hadoop.hive.metastore.metrics.MetricsConstants.COMPACTION_NUM_WORKERS;
 import static 
org.apache.hadoop.hive.metastore.metrics.MetricsConstants.COMPACTION_NUM_WORKER_VERSIONS;
+import static 
org.apache.hadoop.hive.metastore.metrics.MetricsConstants.COMPACTION_OLDEST_CLEANING_AGE;
 import static 
org.apache.hadoop.hive.metastore.metrics.MetricsConstants.COMPACTION_OLDEST_ENQUEUE_AGE;
+import static 
org.apache.hadoop.hive.metastore.metrics.MetricsConstants.COMPACTION_OLDEST_WORKING_AGE;
 import static 
org.apache.hadoop.hive.metastore.metrics.MetricsConstants.COMPACTION_STATUS_PREFIX;
 import static 
org.apache.hadoop.hive.metastore.metrics.MetricsConstants.NUM_ABORTED_TXNS;
 import static 
org.apache.hadoop.hive.metastore.metrics.MetricsConstants.NUM_COMPLETED_TXN_COMPONENTS;
@@ -268,16 +270,34 @@ public class AcidMetricService implements 
MetastoreTaskThread {
   public static void updateMetricsFromShowCompact(ShowCompactResponse 
showCompactResponse, Configuration conf) {
     Map<String, ShowCompactResponseElement> lastElements = new HashMap<>();
     long oldestEnqueueTime = Long.MAX_VALUE;
+    long oldestWorkingTime = Long.MAX_VALUE;
+    long oldestCleaningTime = Long.MAX_VALUE;
 
     // Get the last compaction for each db/table/partition
     for(ShowCompactResponseElement element : 
showCompactResponse.getCompacts()) {
       String key = element.getDbname() + "/" + element.getTablename() +
           (element.getPartitionname() != null ? "/" + 
element.getPartitionname() : "");
+
       // If new key, add the element, if there is an existing one, change to 
the element if the element.id is greater than old.id
       lastElements.compute(key, (k, old) -> (old == null) ? element : 
(element.getId() > old.getId() ? element : old));
-      if (TxnStore.INITIATED_RESPONSE.equals(element.getState()) && 
oldestEnqueueTime > element.getEnqueueTime()) {
+
+      // find the oldest elements with initiated and working states
+      String state = element.getState();
+      if (TxnStore.INITIATED_RESPONSE.equals(state) && (oldestEnqueueTime > 
element.getEnqueueTime())) {
         oldestEnqueueTime = element.getEnqueueTime();
       }
+
+      if (element.isSetStart()) {
+        if (TxnStore.WORKING_RESPONSE.equals(state) && (oldestWorkingTime > 
element.getStart())) {
+          oldestWorkingTime = element.getStart();
+        }
+      }
+
+      if (element.isSetCleanerStart()) {
+        if (TxnStore.CLEANING_RESPONSE.equals(state) && (oldestCleaningTime > 
element.getCleanerStart())) {
+          oldestCleaningTime = element.getCleanerStart();
+        }
+      }
     }
 
     // Get the current count for each state
@@ -304,24 +324,13 @@ public class AcidMetricService implements 
MetastoreTaskThread {
       LOG.warn("Many compactions are failing. Check root cause of failed/not 
initiated compactions.");
     }
 
-    if (oldestEnqueueTime == Long.MAX_VALUE) {
-      Metrics.getOrCreateGauge(COMPACTION_OLDEST_ENQUEUE_AGE).set(0);
-    } else {
-      int oldestEnqueueAge = (int) ((System.currentTimeMillis() - 
oldestEnqueueTime) / 1000L);
-      Metrics.getOrCreateGauge(COMPACTION_OLDEST_ENQUEUE_AGE)
-          .set(oldestEnqueueAge);
-      if (oldestEnqueueAge >= MetastoreConf.getTimeVar(conf,
-          
MetastoreConf.ConfVars.COMPACTOR_OLDEST_INITIATED_COMPACTION_TIME_THRESHOLD_WARNING,
 TimeUnit.SECONDS) &&
-          oldestEnqueueAge < MetastoreConf.getTimeVar(conf,
-              
MetastoreConf.ConfVars.COMPACTOR_OLDEST_INITIATED_COMPACTION_TIME_THRESHOLD_ERROR,
 TimeUnit.SECONDS)) {
-        LOG.warn("Found compaction entry in compaction queue with an age of " 
+ oldestEnqueueAge + " seconds. " +
-            "Consider increasing the number of worker threads.");
-      } else if (oldestEnqueueAge >= MetastoreConf.getTimeVar(conf,
-          
MetastoreConf.ConfVars.COMPACTOR_OLDEST_INITIATED_COMPACTION_TIME_THRESHOLD_ERROR,
 TimeUnit.SECONDS)) {
-        LOG.error("Found compaction entry in compaction queue with an age of " 
+ oldestEnqueueAge + " seconds. " +
-            "Consider increasing the number of worker threads");
-      }
-    }
+    updateOldestCompactionMetric(COMPACTION_OLDEST_ENQUEUE_AGE, 
oldestEnqueueTime, conf,
+        "Found compaction entry in compaction queue with an age of {} seconds. 
" +
+            "Consider increasing the number of worker threads.",
+        
MetastoreConf.ConfVars.COMPACTOR_OLDEST_INITIATED_COMPACTION_TIME_THRESHOLD_WARNING,
+        
MetastoreConf.ConfVars.COMPACTOR_OLDEST_INITIATED_COMPACTION_TIME_THRESHOLD_ERROR);
+    updateOldestCompactionMetric(COMPACTION_OLDEST_WORKING_AGE, 
oldestWorkingTime, conf);
+    updateOldestCompactionMetric(COMPACTION_OLDEST_CLEANING_AGE, 
oldestCleaningTime, conf);
 
     long initiatorsCount = lastElements.values().stream()
         //manually initiated compactions don't count
@@ -340,6 +349,30 @@ public class AcidMetricService implements 
MetastoreTaskThread {
     Metrics.getOrCreateGauge(COMPACTION_NUM_WORKER_VERSIONS).set((int) 
workerVersionsCount);
   }
 
+  private static void updateOldestCompactionMetric(String metricName, long 
oldestTime, Configuration conf) {
+    updateOldestCompactionMetric(metricName, oldestTime, conf, null, null, 
null);
+  }
+
+  private static void updateOldestCompactionMetric(String metricName, long 
oldestTime, Configuration conf,
+      String logMessage, MetastoreConf.ConfVars warningThreshold, 
MetastoreConf.ConfVars errorThreshold) {
+    if (oldestTime == Long.MAX_VALUE) {
+      Metrics.getOrCreateGauge(metricName)
+          .set(0);
+      return;
+    }
+
+    int oldestAge = (int) ((System.currentTimeMillis() - oldestTime) / 1000L);
+    Metrics.getOrCreateGauge(metricName)
+        .set(oldestAge);
+    if (logMessage != null) {
+      if (oldestAge >= MetastoreConf.getTimeVar(conf, errorThreshold, 
TimeUnit.SECONDS)) {
+        LOG.error(logMessage, oldestAge);
+      } else if (oldestAge >= MetastoreConf.getTimeVar(conf, warningThreshold, 
TimeUnit.SECONDS)) {
+        LOG.warn(logMessage, oldestAge);
+      }
+    }
+  }
+
   @Override
   public void setConf(Configuration configuration) {
     this.conf = configuration;
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/metrics/MetricsConstants.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/metrics/MetricsConstants.java
index 6676684..710f0f9 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/metrics/MetricsConstants.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/metrics/MetricsConstants.java
@@ -22,9 +22,13 @@ public class MetricsConstants {
   public static final String API_PREFIX = "api_";
   public static final String COMPACTION_STATUS_PREFIX = "compaction_num_";
   public static final String COMPACTION_OLDEST_ENQUEUE_AGE = 
"compaction_oldest_enqueue_age_in_sec";
+  public static final String COMPACTION_OLDEST_WORKING_AGE = 
"compaction_oldest_working_age_in_sec";
+  public static final String COMPACTION_OLDEST_CLEANING_AGE = 
"compaction_oldest_cleaning_age_in_sec";
   public static final String COMPACTION_INITIATOR_CYCLE = 
"compaction_initiator_cycle";
+  public static final String COMPACTION_INITIATOR_CYCLE_DURATION = 
"compaction_initiator_cycle_duration";
   public static final String COMPACTION_INITIATOR_FAILURE_COUNTER = 
"compaction_initiator_failure_counter";
   public static final String COMPACTION_CLEANER_CYCLE = 
"compaction_cleaner_cycle";
+  public static final String COMPACTION_CLEANER_CYCLE_DURATION = 
"compaction_cleaner_cycle_duration";
   public static final String COMPACTION_CLEANER_FAILURE_COUNTER = 
"compaction_cleaner_failure_counter";
   public static final String COMPACTION_WORKER_CYCLE = 
"compaction_worker_cycle";
 
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
index 4a9a6ef..9cece8a 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
@@ -325,9 +325,9 @@ class CompactionTxnHandler extends TxnHandler {
   public List<CompactionInfo> findReadyToClean(long minOpenTxnWaterMark, long 
retentionTime) throws MetaException {
     try {
       List<CompactionInfo> rc = new ArrayList<>();
-      
+
       try (Connection dbConn = 
getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-           Statement stmt = dbConn.createStatement()) {
+          Statement stmt = dbConn.createStatement()) {
         /*
          * By filtering on minOpenTxnWaterMark, we will only cleanup after 
every transaction is committed, that could see
          * the uncompacted deltas. This way the cleaner can clean up 
everything that was made obsolete by this compaction.
@@ -340,20 +340,20 @@ class CompactionTxnHandler extends TxnHandler {
           whereClause += " AND \"CQ_COMMIT_TIME\" < (" + getEpochFn(dbProduct) 
+ " - " + retentionTime + ")";
         }
         String s = "SELECT \"CQ_ID\", \"cq1\".\"CQ_DATABASE\", 
\"cq1\".\"CQ_TABLE\", \"cq1\".\"CQ_PARTITION\"," +
-          "   \"CQ_TYPE\", \"CQ_RUN_AS\", \"CQ_HIGHEST_WRITE_ID\", 
\"CQ_TBLPROPERTIES\"" +
-          "  FROM \"COMPACTION_QUEUE\" \"cq1\" " +
-          "INNER JOIN (" +
-          "  SELECT MIN(\"CQ_HIGHEST_WRITE_ID\") \"WRITE_ID\", 
\"CQ_DATABASE\", \"CQ_TABLE\", \"CQ_PARTITION\"" +
-          "  FROM \"COMPACTION_QUEUE\"" 
-          + whereClause + 
-          "  GROUP BY \"CQ_DATABASE\", \"CQ_TABLE\", \"CQ_PARTITION\") \"cq2\" 
" +
-          "ON \"cq1\".\"CQ_DATABASE\" = \"cq2\".\"CQ_DATABASE\""+
-          "  AND \"cq1\".\"CQ_TABLE\" = \"cq2\".\"CQ_TABLE\""+
-          "  AND (\"cq1\".\"CQ_PARTITION\" = \"cq2\".\"CQ_PARTITION\"" +
-          "    OR \"cq1\".\"CQ_PARTITION\" IS NULL AND 
\"cq2\".\"CQ_PARTITION\" IS NULL)"
-          + whereClause + 
-          "  AND \"CQ_HIGHEST_WRITE_ID\" = \"WRITE_ID\"" +
-          "  ORDER BY \"CQ_ID\"";
+            "   \"CQ_TYPE\", \"CQ_RUN_AS\", \"CQ_HIGHEST_WRITE_ID\", 
\"CQ_TBLPROPERTIES\"" +
+            "  FROM \"COMPACTION_QUEUE\" \"cq1\" " +
+            "INNER JOIN (" +
+            "  SELECT MIN(\"CQ_HIGHEST_WRITE_ID\") \"WRITE_ID\", 
\"CQ_DATABASE\", \"CQ_TABLE\", \"CQ_PARTITION\"" +
+            "  FROM \"COMPACTION_QUEUE\""
+            + whereClause +
+            "  GROUP BY \"CQ_DATABASE\", \"CQ_TABLE\", \"CQ_PARTITION\") 
\"cq2\" " +
+            "ON \"cq1\".\"CQ_DATABASE\" = \"cq2\".\"CQ_DATABASE\""+
+            "  AND \"cq1\".\"CQ_TABLE\" = \"cq2\".\"CQ_TABLE\""+
+            "  AND (\"cq1\".\"CQ_PARTITION\" = \"cq2\".\"CQ_PARTITION\"" +
+            "    OR \"cq1\".\"CQ_PARTITION\" IS NULL AND 
\"cq2\".\"CQ_PARTITION\" IS NULL)"
+            + whereClause +
+            "  AND \"CQ_HIGHEST_WRITE_ID\" = \"WRITE_ID\"" +
+            "  ORDER BY \"CQ_ID\"";
         LOG.debug("Going to execute query <" + s + ">");
 
         try (ResultSet rs = stmt.executeQuery(s)) {
@@ -377,13 +377,110 @@ class CompactionTxnHandler extends TxnHandler {
         LOG.error("Unable to select next element for cleaning, " + 
e.getMessage());
         checkRetryable(e, "findReadyToClean");
         throw new MetaException("Unable to connect to transaction database " +
-          StringUtils.stringifyException(e));
-      } 
+            StringUtils.stringifyException(e));
+      }
     } catch (RetryException e) {
       return findReadyToClean(minOpenTxnWaterMark, retentionTime);
     }
   }
 
+
+  /**
+   * Mark the cleaning start time for a particular compaction
+   *
+   * @param info info on the compaction entry
+   */
+  @Override
+  @RetrySemantics.ReadOnly
+  public void markCleanerStart(CompactionInfo info) throws MetaException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Running markCleanerStart with CompactionInfo: " + 
info.toString());
+    }
+
+    try {
+      Connection dbConn = null;
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        long now = getDbTime(dbConn);
+        setCleanerStart(dbConn, info, now);
+      } catch (SQLException e) {
+        LOG.error("Unable to set the cleaner start time for compaction record  
" + e.getMessage());
+        LOG.debug("Going to rollback");
+        rollbackDBConn(dbConn);
+        checkRetryable(e, "markCleanerStart(" + info + ")");
+        throw new MetaException("Unable to connect to transaction database " + 
StringUtils.stringifyException(e));
+      } finally {
+        closeDbConn(dbConn);
+      }
+    } catch (RetryException e) {
+      markCleanerStart(info);
+    }
+  }
+
+  /**
+   * Removes the cleaning start time for a particular compaction
+   *
+   * @param info info on the compaction entry
+   */
+  @Override
+  @RetrySemantics.ReadOnly
+  public void clearCleanerStart(CompactionInfo info) throws MetaException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Running clearCleanerStart with CompactionInfo: " + 
info.toString());
+    }
+
+    try {
+      Connection dbConn = null;
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        setCleanerStart(dbConn, info, -1L);
+      } catch (SQLException e) {
+        LOG.error("Unable to clear the cleaner start time for compaction 
record  " + e.getMessage());
+        LOG.debug("Going to rollback");
+        rollbackDBConn(dbConn);
+        checkRetryable(e, "clearCleanerStart(" + info + ")");
+        throw new MetaException("Unable to connect to transaction database " + 
StringUtils.stringifyException(e));
+      } finally {
+        closeDbConn(dbConn);
+      }
+    } catch (RetryException e) {
+      clearCleanerStart(info);
+    }
+  }
+
+  private void setCleanerStart(Connection dbConn, CompactionInfo info, Long 
timestamp)
+      throws RetryException, SQLException {
+    long id = info.id;
+    PreparedStatement pStmt = null;
+    ResultSet rs = null;
+    try {
+      String query = "" +
+          " UPDATE " +
+          "  \"COMPACTION_QUEUE\" " +
+          " SET " +
+          "  \"CQ_CLEANER_START\" = " + timestamp +
+          " WHERE " +
+          "  \"CQ_ID\" = " + id +
+          " AND " +
+          "  \"CQ_STATE\"='" + READY_FOR_CLEANING + "'";
+
+      pStmt = dbConn.prepareStatement(query);
+      LOG.debug("Going to execute update <" + query + "> for CQ_ID=" + id);
+      int updCount = pStmt.executeUpdate();
+      if (updCount != 1) {
+        LOG.error("Unable to update compaction record: " + info + ".  Update 
count=" + updCount);
+        LOG.debug("Going to rollback");
+        dbConn.rollback();
+      } else {
+        LOG.debug("Going to commit");
+        dbConn.commit();
+      }
+    } finally {
+      close(rs);
+      closeStmt(pStmt);
+    }
+  }
+
   /**
    * This will remove an entry from the queue after
    * it has been compacted.
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 0d7d573..c9c09ad 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -3703,6 +3703,7 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
         return Character.toString(s);
     }
   }
+
   @RetrySemantics.ReadOnly
   public ShowCompactResponse showCompact(ShowCompactRequest rqst) throws 
MetaException {
     ShowCompactResponse response = new ShowCompactResponse(new ArrayList<>());
@@ -3712,15 +3713,23 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
       try {
         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
         stmt = dbConn.createStatement();
-        String s = "SELECT \"CQ_DATABASE\", \"CQ_TABLE\", \"CQ_PARTITION\", 
\"CQ_STATE\", \"CQ_TYPE\", \"CQ_WORKER_ID\", " +
-          //-1 because 'null' literal doesn't work for all DBs...
-          "\"CQ_START\", -1 \"CC_END\", \"CQ_RUN_AS\", \"CQ_HADOOP_JOB_ID\", 
\"CQ_ID\", \"CQ_ERROR_MESSAGE\", " +
-          "\"CQ_ENQUEUE_TIME\", \"CQ_WORKER_VERSION\", \"CQ_INITIATOR_ID\", 
\"CQ_INITIATOR_VERSION\" " +
-          "FROM \"COMPACTION_QUEUE\" UNION ALL " +
-          "SELECT \"CC_DATABASE\", \"CC_TABLE\", \"CC_PARTITION\", 
\"CC_STATE\", \"CC_TYPE\", \"CC_WORKER_ID\", " +
-          "\"CC_START\", \"CC_END\", \"CC_RUN_AS\", \"CC_HADOOP_JOB_ID\", 
\"CC_ID\", \"CC_ERROR_MESSAGE\", " +
-          "\"CC_ENQUEUE_TIME\", \"CC_WORKER_VERSION\", \"CC_INITIATOR_ID\", 
\"CC_INITIATOR_VERSION\" " +
-          " FROM \"COMPLETED_COMPACTIONS\""; //todo: sort by cq_id?
+        String s = "" +
+            //-1 because 'null' literal doesn't work for all DBs...
+            "SELECT " +
+            "  \"CQ_DATABASE\", \"CQ_TABLE\", \"CQ_PARTITION\", \"CQ_STATE\", 
\"CQ_TYPE\", \"CQ_WORKER_ID\", " +
+            "  \"CQ_START\", -1 \"CC_END\", \"CQ_RUN_AS\", 
\"CQ_HADOOP_JOB_ID\", \"CQ_ID\", \"CQ_ERROR_MESSAGE\", " +
+            "  \"CQ_ENQUEUE_TIME\", \"CQ_WORKER_VERSION\", 
\"CQ_INITIATOR_ID\", \"CQ_INITIATOR_VERSION\", " +
+            "  \"CQ_CLEANER_START\"" +
+            "FROM " +
+            "  \"COMPACTION_QUEUE\" " +
+            "UNION ALL " +
+            "SELECT " +
+            "  \"CC_DATABASE\", \"CC_TABLE\", \"CC_PARTITION\", \"CC_STATE\", 
\"CC_TYPE\", \"CC_WORKER_ID\", " +
+            "  \"CC_START\", \"CC_END\", \"CC_RUN_AS\", \"CC_HADOOP_JOB_ID\", 
\"CC_ID\", \"CC_ERROR_MESSAGE\", " +
+            "  \"CC_ENQUEUE_TIME\", \"CC_WORKER_VERSION\", 
\"CC_INITIATOR_ID\", \"CC_INITIATOR_VERSION\", " +
+            "  -1 " +
+            "FROM " +
+            "  \"COMPLETED_COMPACTIONS\""; //todo: sort by cq_id?
         //what I want is order by cc_end desc, cc_start asc (but derby has a 
bug https://issues.apache.org/jira/browse/DERBY-6013)
         //to sort so that currently running jobs are at the end of the list 
(bottom of screen)
         //and currently running ones are in sorted by start time
@@ -3740,11 +3749,11 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
           }
           e.setWorkerid(rs.getString(6));
           long start = rs.getLong(7);
-          if(!rs.wasNull()) {
+          if (!rs.wasNull()) {
             e.setStart(start);
           }
           long endTime = rs.getLong(8);
-          if(endTime != -1) {
+          if (endTime != -1) {
             e.setEndTime(endTime);
           }
           e.setRunAs(rs.getString(9));
@@ -3752,18 +3761,22 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
           e.setId(rs.getLong(11));
           e.setErrorMessage(rs.getString(12));
           long enqueueTime = rs.getLong(13);
-          if(!rs.wasNull()) {
+          if (!rs.wasNull()) {
             e.setEnqueueTime(enqueueTime);
           }
           e.setWorkerVersion(rs.getString(14));
           e.setInitiatorId(rs.getString(15));
           e.setInitiatorVersion(rs.getString(16));
+          long cleanerStart = rs.getLong(17);
+          if (!rs.wasNull() && (cleanerStart != -1)) {
+            e.setCleanerStart(cleanerStart);
+          }
           response.addToCompacts(e);
         }
       } catch (SQLException e) {
         checkRetryable(e, "showCompact(" + rqst + ")");
         throw new MetaException("Unable to select from transaction database " +
-          StringUtils.stringifyException(e));
+            StringUtils.stringifyException(e));
       } finally {
         closeStmt(stmt);
         closeDbConn(dbConn);
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
index a0af4a8..d325765 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
@@ -24,8 +24,52 @@ import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.common.classification.RetrySemantics;
-import org.apache.hadoop.hive.metastore.api.*;
-import org.apache.hadoop.hive.metastore.events.AcidWriteEvent;
+import org.apache.hadoop.hive.metastore.api.AbortTxnRequest;
+import org.apache.hadoop.hive.metastore.api.AbortTxnsRequest;
+import org.apache.hadoop.hive.metastore.api.AddDynamicPartitions;
+import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsRequest;
+import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsResponse;
+import org.apache.hadoop.hive.metastore.api.CheckLockRequest;
+import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
+import org.apache.hadoop.hive.metastore.api.CompactionRequest;
+import org.apache.hadoop.hive.metastore.api.CompactionResponse;
+import org.apache.hadoop.hive.metastore.api.CreationMetadata;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.FindNextCompactRequest;
+import 
org.apache.hadoop.hive.metastore.api.GetLatestCommittedCompactionInfoRequest;
+import 
org.apache.hadoop.hive.metastore.api.GetLatestCommittedCompactionInfoResponse;
+import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;
+import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse;
+import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsRequest;
+import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsResponse;
+import org.apache.hadoop.hive.metastore.api.HeartbeatRequest;
+import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeRequest;
+import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeResponse;
+import org.apache.hadoop.hive.metastore.api.HiveObjectType;
+import org.apache.hadoop.hive.metastore.api.LockRequest;
+import org.apache.hadoop.hive.metastore.api.LockResponse;
+import org.apache.hadoop.hive.metastore.api.Materialization;
+import org.apache.hadoop.hive.metastore.api.MaxAllocatedTableWriteIdRequest;
+import org.apache.hadoop.hive.metastore.api.MaxAllocatedTableWriteIdResponse;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchLockException;
+import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
+import org.apache.hadoop.hive.metastore.api.OpenTxnRequest;
+import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.ReplTblWriteIdStateRequest;
+import org.apache.hadoop.hive.metastore.api.SeedTableWriteIdsRequest;
+import org.apache.hadoop.hive.metastore.api.SeedTxnIdRequest;
+import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
+import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
+import org.apache.hadoop.hive.metastore.api.ShowLocksRequest;
+import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
+import org.apache.hadoop.hive.metastore.api.TxnOpenException;
+import org.apache.hadoop.hive.metastore.api.TxnType;
+import org.apache.hadoop.hive.metastore.api.UnlockRequest;
+import org.apache.hadoop.hive.metastore.api.UpdateTransactionalStatsRequest;
 import org.apache.hadoop.hive.metastore.events.ListenerEvent;
 
 import java.sql.SQLException;
@@ -382,7 +426,7 @@ public interface TxnStore extends Configurable {
    * @param compactionTxnId - txnid in which Compactor is running
    */
   @RetrySemantics.Idempotent
-  public void updateCompactorState(CompactionInfo ci, long compactionTxnId) 
throws MetaException;
+  void updateCompactorState(CompactionInfo ci, long compactionTxnId) throws 
MetaException;
 
   /**
    * This will grab the next compaction request off of
@@ -423,6 +467,22 @@ public interface TxnStore extends Configurable {
   List<CompactionInfo> findReadyToClean(long minOpenTxnWaterMark, long 
retentionTime) throws MetaException;
 
   /**
+   * Sets the cleaning start time for a particular compaction
+   *
+   * @param info info on the compaction entry
+   */
+  @RetrySemantics.CannotRetry
+  void markCleanerStart(CompactionInfo info) throws MetaException;
+
+  /**
+   * Removes the cleaning start time for a particular compaction
+   *
+   * @param info info on the compaction entry
+   */
+  @RetrySemantics.CannotRetry
+  void clearCleanerStart(CompactionInfo info) throws MetaException;
+
+  /**
    * This will remove an entry from the queue after
    * it has been compacted.
    *
diff --git 
a/standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql
 
b/standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql
index 0d4ad17..5c49580 100644
--- 
a/standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql
+++ 
b/standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql
@@ -628,7 +628,8 @@ CREATE TABLE COMPACTION_QUEUE (
   CQ_COMMIT_TIME bigint,
   CQ_INITIATOR_ID varchar(128),
   CQ_INITIATOR_VERSION varchar(128),
-  CQ_WORKER_VERSION varchar(128)
+  CQ_WORKER_VERSION varchar(128),
+  CQ_CLEANER_START bigint
 );
 
 CREATE TABLE NEXT_COMPACTION_QUEUE_ID (
diff --git 
a/standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql
 
b/standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql
index 86c49c4..37e42d8 100644
--- 
a/standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql
+++ 
b/standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql
@@ -182,5 +182,8 @@ ALTER TABLE "APP"."MV_TABLES_USED" ADD COLUMN 
"UPDATED_COUNT" BIGINT NOT NULL DE
 ALTER TABLE "APP"."MV_TABLES_USED" ADD COLUMN "DELETED_COUNT" BIGINT NOT NULL 
DEFAULT 0;
 ALTER TABLE "APP"."MV_TABLES_USED" ADD CONSTRAINT "MV_TABLES_USED_PK" PRIMARY 
KEY ("TBL_ID", "MV_CREATION_METADATA_ID");
 
+-- HIVE-25737
+ALTER TABLE COMPACTION_QUEUE ADD CQ_CLEANER_START bigint;
+
 -- This needs to be the last thing done.  Insert any changes above this line.
 UPDATE "APP".VERSION SET SCHEMA_VERSION='4.0.0', VERSION_COMMENT='Hive release 
version 4.0.0' where VER_ID=1;
diff --git 
a/standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql
 
b/standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql
index 6a3b174..72a402a 100644
--- 
a/standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql
+++ 
b/standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql
@@ -1048,6 +1048,7 @@ CREATE TABLE COMPACTION_QUEUE(
     CQ_INITIATOR_ID nvarchar(128) NULL,
     CQ_INITIATOR_VERSION nvarchar(128) NULL,
     CQ_WORKER_VERSION nvarchar(128) NULL,
+       CQ_CLEANER_START bigint NULL,
 PRIMARY KEY CLUSTERED
 (
        CQ_ID ASC
diff --git 
a/standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql
 
b/standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql
index 296cca5..5460d7a 100644
--- 
a/standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql
+++ 
b/standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql
@@ -233,6 +233,9 @@ ALTER TABLE "MV_TABLES_USED" ADD "UPDATED_COUNT" BIGINT NOT 
NULL DEFAULT 0;
 ALTER TABLE "MV_TABLES_USED" ADD "DELETED_COUNT" BIGINT NOT NULL DEFAULT 0;
 ALTER TABLE "MV_TABLES_USED" ADD CONSTRAINT "MV_TABLES_USED_PK" PRIMARY KEY 
("TBL_ID", "MV_CREATION_METADATA_ID");
 
+-- HIVE-25737
+ALTER TABLE COMPACTION_QUEUE ADD CQ_CLEANER_START bigint NULL;
+
 -- These lines need to be last.  Insert any changes above.
 UPDATE VERSION SET SCHEMA_VERSION='4.0.0', VERSION_COMMENT='Hive release 
version 4.0.0' where VER_ID=1;
 SELECT 'Finished upgrading MetaStore schema from 3.2.0 to 4.0.0' AS MESSAGE;
diff --git 
a/standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql
 
b/standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql
index 0cbc091..2ab4cf6 100644
--- 
a/standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql
+++ 
b/standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql
@@ -1089,7 +1089,8 @@ CREATE TABLE COMPACTION_QUEUE (
   CQ_COMMIT_TIME bigint,
   CQ_INITIATOR_ID varchar(128),
   CQ_INITIATOR_VERSION varchar(128),
-  CQ_WORKER_VERSION varchar(128)
+  CQ_WORKER_VERSION varchar(128),
+  CQ_CLEANER_START bigint
 ) ENGINE=InnoDB DEFAULT CHARSET=latin1;
 
 CREATE TABLE COMPLETED_COMPACTIONS (
diff --git 
a/standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql
 
b/standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql
index 1d3a1f06..89b6a91 100644
--- 
a/standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql
+++ 
b/standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql
@@ -210,6 +210,9 @@ ALTER TABLE `MV_TABLES_USED` ADD COLUMN `UPDATED_COUNT` 
bigint(20) NOT NULL DEFA
 ALTER TABLE `MV_TABLES_USED` ADD COLUMN `DELETED_COUNT` bigint(20) NOT NULL 
DEFAULT 0;
 ALTER TABLE `MV_TABLES_USED` ADD CONSTRAINT `MV_TABLES_USED_PK` PRIMARY KEY 
(`TBL_ID`, `MV_CREATION_METADATA_ID`);
 
+-- HIVE-25737
+ALTER TABLE COMPACTION_QUEUE ADD CQ_CLEANER_START bigint;
+
 -- These lines need to be last.  Insert any changes above.
 UPDATE VERSION SET SCHEMA_VERSION='4.0.0', VERSION_COMMENT='Hive release 
version 4.0.0' where VER_ID=1;
 SELECT 'Finished upgrading MetaStore schema from 3.2.0 to 4.0.0' AS MESSAGE;
diff --git 
a/standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql
 
b/standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql
index a208154..055f101 100644
--- 
a/standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql
+++ 
b/standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql
@@ -1091,7 +1091,8 @@ CREATE TABLE COMPACTION_QUEUE (
   CQ_COMMIT_TIME NUMBER(19),
   CQ_INITIATOR_ID varchar(128),
   CQ_INITIATOR_VERSION varchar(128),
-  CQ_WORKER_VERSION varchar(128)
+  CQ_WORKER_VERSION varchar(128),
+  CQ_CLEANER_START NUMBER(19)
 ) ROWDEPENDENCIES;
 
 CREATE TABLE NEXT_COMPACTION_QUEUE_ID (
diff --git 
a/standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql
 
b/standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql
index d5810de..cce71a3 100644
--- 
a/standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql
+++ 
b/standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql
@@ -207,7 +207,9 @@ ALTER TABLE MV_TABLES_USED ADD UPDATED_COUNT NUMBER DEFAULT 
0 NOT NULL;
 ALTER TABLE MV_TABLES_USED ADD DELETED_COUNT NUMBER DEFAULT 0 NOT NULl;
 ALTER TABLE MV_TABLES_USED ADD CONSTRAINT MV_TABLES_USED_PK PRIMARY KEY 
(TBL_ID, MV_CREATION_METADATA_ID);
 
+-- HIVE-25737
+ALTER TABLE COMPACTION_QUEUE ADD CQ_CLEANER_START NUMBER(19);
+
 -- These lines need to be last.  Insert any changes above.
 UPDATE VERSION SET SCHEMA_VERSION='4.0.0', VERSION_COMMENT='Hive release 
version 4.0.0' where VER_ID=1;
 SELECT 'Finished upgrading MetaStore schema from 3.2.0 to 4.0.0' AS Status 
from dual;
-
diff --git 
a/standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql
 
b/standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql
index 6e83ff1..639f18d 100644
--- 
a/standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql
+++ 
b/standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql
@@ -1800,7 +1800,8 @@ CREATE TABLE "COMPACTION_QUEUE" (
   "CQ_COMMIT_TIME" bigint,
   "CQ_INITIATOR_ID" varchar(128),
   "CQ_INITIATOR_VERSION" varchar(128),
-  "CQ_WORKER_VERSION" varchar(128)
+  "CQ_WORKER_VERSION" varchar(128),
+  "CQ_CLEANER_START" bigint
 );
 
 CREATE TABLE "NEXT_COMPACTION_QUEUE_ID" (
diff --git 
a/standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql
 
b/standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql
index 83f8336..a2c0a81 100644
--- 
a/standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql
+++ 
b/standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql
@@ -344,7 +344,9 @@ ALTER TABLE "MV_TABLES_USED" ADD "UPDATED_COUNT" bigint NOT 
NULL DEFAULT 0;
 ALTER TABLE "MV_TABLES_USED" ADD "DELETED_COUNT" bigint NOT NULL DEFAULT 0;
 ALTER TABLE "MV_TABLES_USED" ADD CONSTRAINT "MV_TABLES_USED_PK" PRIMARY KEY 
("TBL_ID", "MV_CREATION_METADATA_ID");
 
+-- HIVE-25737
+ALTER TABLE "COMPACTION_QUEUE" ADD "CQ_CLEANER_START" bigint;
+
 -- These lines need to be last. Insert any changes above.
 UPDATE "VERSION" SET "SCHEMA_VERSION"='4.0.0', "VERSION_COMMENT"='Hive release 
version 4.0.0' where "VER_ID"=1;
 SELECT 'Finished upgrading MetaStore schema from 3.2.0 to 4.0.0';
-
diff --git 
a/standalone-metastore/metastore-server/src/test/resources/sql/postgres/upgrade-3.1.3000-to-4.0.0.postgres.sql
 
b/standalone-metastore/metastore-server/src/test/resources/sql/postgres/upgrade-3.1.3000-to-4.0.0.postgres.sql
index 21f8061..e6507bd 100644
--- 
a/standalone-metastore/metastore-server/src/test/resources/sql/postgres/upgrade-3.1.3000-to-4.0.0.postgres.sql
+++ 
b/standalone-metastore/metastore-server/src/test/resources/sql/postgres/upgrade-3.1.3000-to-4.0.0.postgres.sql
@@ -115,6 +115,9 @@ ALTER TABLE "DBS" ADD "DATACONNECTOR_NAME" character 
varying(128);
 ALTER TABLE "DBS" ADD "REMOTE_DBNAME" character varying(128);
 UPDATE "DBS" SET "TYPE"= 'NATIVE' WHERE "TYPE" IS NULL;
 
+-- HIVE-25737
+ALTER TABLE "COMPACTION_QUEUE" ADD "CQ_CLEANER_START" bigint;
+
 -- These lines need to be last. Insert any changes above.
 UPDATE "VERSION" SET "SCHEMA_VERSION"='4.0.0', "VERSION_COMMENT"='Hive release 
version 4.0.0' where "VER_ID"=1;
 SELECT 'Finished upgrading MetaStore schema from 3.1.3000 to 4.0.0';

Reply via email to