This is an automated email from the ASF dual-hosted git repository.
klcopp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 6a53540 HIVE-25737: Compaction Observability:
Initiator/Worker/Cleaner cycle measurement improvements (Viktor Csomor,
reviewed by Laszlo Pinter and Karen Coppage)
6a53540 is described below
commit 6a535403a49b972bb0a488d65bab06c57e3c8c66
Author: Viktor Csomor <[email protected]>
AuthorDate: Thu Dec 9 09:32:33 2021 +0100
HIVE-25737: Compaction Observability: Initiator/Worker/Cleaner cycle
measurement improvements (Viktor Csomor, reviewed by Laszlo Pinter and Karen
Coppage)
Closes #2827.
A daemon thread has been implemented for the Initiator that measures the
elapsed time since its start.
The PerformanceLogger approach is also kept but the metrics intended to use
the gauge style
The Age of Oldest Working Compaction metric has been implemented in the
AcidMetricService
The Age of Oldest active Cleaner metric has been implemented
- CQ_CLEANER_START field added to the COMPACTION_QUEUE table
- COMPACTION_OLDEST_CLEANING_AGE metric has been added
- markCleaning method added to the CompactionTxnHandler
- ShowCompactResponseElement extended with the cleanerStart field
---
.../java/org/apache/hadoop/hive/conf/HiveConf.java | 16 ++
.../upgrade/hive/hive-schema-4.0.0.hive.sql | 21 ++-
.../upgrade/hive/upgrade-3.1.0-to-4.0.0.hive.sql | 21 ++-
.../hadoop/hive/ql/txn/compactor/Cleaner.java | 31 ++++
.../hadoop/hive/ql/txn/compactor/Initiator.java | 55 +++++-
.../ql/txn/compactor/MetaStoreCompactorThread.java | 39 +++++
.../ql/txn/compactor/TestCompactionMetrics.java | 184 +++++++++++++++++----
.../test/results/clientpositive/llap/sysdb.q.out | 11 +-
.../gen/thrift/gen-cpp/hive_metastore_types.cpp | 22 +++
.../src/gen/thrift/gen-cpp/hive_metastore_types.h | 12 +-
.../metastore/api/ShowCompactResponseElement.java | 106 +++++++++++-
.../metastore/ShowCompactResponseElement.php | 24 +++
.../src/gen/thrift/gen-py/hive_metastore/ttypes.py | 14 +-
.../src/gen/thrift/gen-rb/hive_metastore_types.rb | 4 +-
.../hadoop/hive/metastore/conf/MetastoreConf.java | 12 ++
.../src/main/thrift/hive_metastore.thrift | 3 +-
.../hive/metastore/metrics/AcidMetricService.java | 71 +++++---
.../hive/metastore/metrics/MetricsConstants.java | 4 +
.../hive/metastore/txn/CompactionTxnHandler.java | 133 +++++++++++++--
.../hadoop/hive/metastore/txn/TxnHandler.java | 39 +++--
.../apache/hadoop/hive/metastore/txn/TxnStore.java | 66 +++++++-
.../src/main/sql/derby/hive-schema-4.0.0.derby.sql | 3 +-
.../sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql | 3 +
.../src/main/sql/mssql/hive-schema-4.0.0.mssql.sql | 1 +
.../sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql | 3 +
.../src/main/sql/mysql/hive-schema-4.0.0.mysql.sql | 3 +-
.../sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql | 3 +
.../main/sql/oracle/hive-schema-4.0.0.oracle.sql | 3 +-
.../sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql | 4 +-
.../sql/postgres/hive-schema-4.0.0.postgres.sql | 3 +-
.../postgres/upgrade-3.2.0-to-4.0.0.postgres.sql | 4 +-
.../upgrade-3.1.3000-to-4.0.0.postgres.sql | 3 +
32 files changed, 800 insertions(+), 121 deletions(-)
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 988cec8..d9eec2e 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -3190,6 +3190,22 @@ public class HiveConf extends Configuration {
"has had a transaction done on it since the last major compaction. So
decreasing this\n" +
"value will increase the load on the NameNode."),
+
HIVE_COMPACTOR_INITIATOR_DURATION_UPDATE_INTERVAL("hive.compactor.initiator.duration.update.interval",
"60s",
+ new TimeValidator(TimeUnit.SECONDS),
+ "Time in seconds that drives the update interval of
compaction_initiator_duration metric.\n" +
+ "Smaller value results in a fine grained metric update.\n" +
+ "This updater can be turned off if its value less than or equals
to zero.\n"+
+ "In this case the above metric will be update only after the
initiator completed one cycle.\n" +
+ "The hive.compactor.initiator.on must be turned on (true) in-order
to enable the Initiator,\n" +
+ "otherwise this setting has no effect."),
+
+
HIVE_COMPACTOR_CLEANER_DURATION_UPDATE_INTERVAL("hive.compactor.cleaner.duration.update.interval",
"60s",
+ new TimeValidator(TimeUnit.SECONDS),
+ "Time in seconds that drives the update interval of
compaction_cleaner_duration metric.\n" +
+ "Smaller value results in a fine grained metric update.\n" +
+ "This updater can be turned off if its value less than or equals
to zero.\n"+
+ "In this case the above metric will be update only after the
cleaner completed one cycle."),
+
HIVE_COMPACTOR_REQUEST_QUEUE("hive.compactor.request.queue", 1,
"Enables parallelization of the checkForCompaction operation, that
includes many file metadata checks\n" +
"and may be expensive"),
diff --git a/metastore/scripts/upgrade/hive/hive-schema-4.0.0.hive.sql
b/metastore/scripts/upgrade/hive/hive-schema-4.0.0.hive.sql
index 9de3488..d0654a5 100644
--- a/metastore/scripts/upgrade/hive/hive-schema-4.0.0.hive.sql
+++ b/metastore/scripts/upgrade/hive/hive-schema-4.0.0.hive.sql
@@ -1092,7 +1092,8 @@ CREATE EXTERNAL TABLE IF NOT EXISTS `COMPACTION_QUEUE` (
`CQ_ERROR_MESSAGE` string,
`CQ_INITIATOR_ID` string,
`CQ_INITIATOR_VERSION` string,
- `CQ_WORKER_VERSION` string
+ `CQ_WORKER_VERSION` string,
+ `CQ_CLEANER_START` bigint
)
STORED BY 'org.apache.hive.storage.jdbc.JdbcStorageHandler'
TBLPROPERTIES (
@@ -1115,7 +1116,8 @@ TBLPROPERTIES (
\"COMPACTION_QUEUE\".\"CQ_ERROR_MESSAGE\",
\"COMPACTION_QUEUE\".\"CQ_INITIATOR_ID\",
\"COMPACTION_QUEUE\".\"CQ_INITIATOR_VERSION\",
- \"COMPACTION_QUEUE\".\"CQ_WORKER_VERSION\"
+ \"COMPACTION_QUEUE\".\"CQ_WORKER_VERSION\",
+ \"COMPACTION_QUEUE\".\"CQ_CLEANER_START\"
FROM \"COMPACTION_QUEUE\"
"
);
@@ -1187,7 +1189,8 @@ CREATE OR REPLACE VIEW `COMPACTIONS`
`C_HIGHEST_WRITE_ID`,
`C_INITIATOR_HOST`,
`C_INITIATOR_ID`,
- `C_INITIATOR_VERSION`
+ `C_INITIATOR_VERSION`,
+ `C_CLEANER_START`
) AS
SELECT
CC_ID,
@@ -1209,7 +1212,8 @@ SELECT
CC_HIGHEST_WRITE_ID,
CASE WHEN CC_INITIATOR_ID IS NULL THEN cast (null as string) ELSE
split(CC_INITIATOR_ID,"-")[0] END,
CASE WHEN CC_INITIATOR_ID IS NULL THEN cast (null as string) ELSE
split(CC_INITIATOR_ID,"-")[size(split(CC_INITIATOR_ID,"-"))-1] END,
- CC_INITIATOR_VERSION
+ CC_INITIATOR_VERSION,
+ NULL
FROM COMPLETED_COMPACTIONS
UNION ALL
SELECT
@@ -1231,7 +1235,8 @@ SELECT
CQ_HIGHEST_WRITE_ID,
CASE WHEN CQ_INITIATOR_ID IS NULL THEN NULL ELSE
split(CQ_INITIATOR_ID,"-")[0] END,
CASE WHEN CQ_INITIATOR_ID IS NULL THEN NULL ELSE
split(CQ_INITIATOR_ID,"-")[size(split(CQ_INITIATOR_ID,"-"))-1] END,
- CQ_INITIATOR_VERSION
+ CQ_INITIATOR_VERSION,
+ CQ_CLEANER_START
FROM COMPACTION_QUEUE;
CREATE EXTERNAL TABLE IF NOT EXISTS `SCHEDULED_QUERIES` (
@@ -1874,7 +1879,8 @@ CREATE OR REPLACE VIEW `COMPACTIONS`
`C_HIGHEST_WRITE_ID`,
`C_INITIATOR_HOST`,
`C_INITIATOR_ID`,
- `C_INITIATOR_VERSION`
+ `C_INITIATOR_VERSION`,
+ `C_CLEANER_START`
) AS
SELECT DISTINCT
C_ID,
@@ -1895,7 +1901,8 @@ SELECT DISTINCT
C_HIGHEST_WRITE_ID,
C_INITIATOR_HOST,
C_INITIATOR_ID,
- C_INITIATOR_VERSION
+ C_INITIATOR_VERSION,
+ C_CLEANER_START
FROM
`sys`.`COMPACTIONS` C JOIN `sys`.`TBLS` T ON (C.`C_TABLE` = T.`TBL_NAME`)
JOIN `sys`.`DBS` D ON (C.`C_DATABASE` = D.`NAME`)
diff --git a/metastore/scripts/upgrade/hive/upgrade-3.1.0-to-4.0.0.hive.sql
b/metastore/scripts/upgrade/hive/upgrade-3.1.0-to-4.0.0.hive.sql
index 6e3a9e1..fb76575 100644
--- a/metastore/scripts/upgrade/hive/upgrade-3.1.0-to-4.0.0.hive.sql
+++ b/metastore/scripts/upgrade/hive/upgrade-3.1.0-to-4.0.0.hive.sql
@@ -210,7 +210,8 @@ CREATE EXTERNAL TABLE IF NOT EXISTS `COMPACTION_QUEUE` (
`CQ_ERROR_MESSAGE` string,
`CQ_INITIATOR_ID` string,
`CQ_INITIATOR_VERSION` string,
- `CQ_WORKER_VERSION` string
+ `CQ_WORKER_VERSION` string,
+ `CQ_CLEANER_START` bigint
)
STORED BY 'org.apache.hive.storage.jdbc.JdbcStorageHandler'
TBLPROPERTIES (
@@ -233,7 +234,8 @@ TBLPROPERTIES (
\"COMPACTION_QUEUE\".\"CQ_ERROR_MESSAGE\",
\"COMPACTION_QUEUE\".\"CQ_INITIATOR_ID\",
\"COMPACTION_QUEUE\".\"CQ_INITIATOR_VERSION\",
- \"COMPACTION_QUEUE\".\"CQ_WORKER_VERSION\"
+ \"COMPACTION_QUEUE\".\"CQ_WORKER_VERSION\",
+ \"COMPACTION_QUEUE\".\"CQ_CLEANER_START\"
FROM \"COMPACTION_QUEUE\"
"
);
@@ -305,7 +307,8 @@ CREATE OR REPLACE VIEW `COMPACTIONS`
`C_HIGHEST_WRITE_ID`,
`C_INITIATOR_HOST`,
`C_INITIATOR_ID`,
- `C_INITIATOR_VERSION`
+ `C_INITIATOR_VERSION`,
+ `C_CLEANER_START`
) AS
SELECT
CC_ID,
@@ -327,7 +330,8 @@ SELECT
CC_HIGHEST_WRITE_ID,
CASE WHEN CC_INITIATOR_ID IS NULL THEN cast (null as string) ELSE
split(CC_INITIATOR_ID,"-")[0] END,
CASE WHEN CC_INITIATOR_ID IS NULL THEN cast (null as string) ELSE
split(CC_INITIATOR_ID,"-")[size(split(CC_INITIATOR_ID,"-"))-1] END,
- CC_INITIATOR_VERSION
+ CC_INITIATOR_VERSION,
+ NULL
FROM COMPLETED_COMPACTIONS
UNION ALL
SELECT
@@ -349,7 +353,8 @@ SELECT
CQ_HIGHEST_WRITE_ID,
CASE WHEN CQ_INITIATOR_ID IS NULL THEN NULL ELSE
split(CQ_INITIATOR_ID,"-")[0] END,
CASE WHEN CQ_INITIATOR_ID IS NULL THEN NULL ELSE
split(CQ_INITIATOR_ID,"-")[size(split(CQ_INITIATOR_ID,"-"))-1] END,
- CQ_INITIATOR_VERSION
+ CQ_INITIATOR_VERSION,
+ CQ_CLEANER_START
FROM COMPACTION_QUEUE;
-- HIVE-22553
@@ -841,7 +846,8 @@ CREATE OR REPLACE VIEW `COMPACTIONS`
`C_HIGHEST_WRITE_ID`,
`C_INITIATOR_HOST`,
`C_INITIATOR_ID`,
- `C_INITIATOR_VERSION`
+ `C_INITIATOR_VERSION`,
+ `C_CLEANER_START`
) AS
SELECT DISTINCT
C_ID,
@@ -862,7 +868,8 @@ SELECT DISTINCT
C_HIGHEST_WRITE_ID,
C_INITIATOR_HOST,
C_INITIATOR_ID,
- C_INITIATOR_VERSION
+ C_INITIATOR_VERSION,
+ C_CLEANER_START
FROM
`sys`.`COMPACTIONS` C JOIN `sys`.`TBLS` T ON (C.`C_TABLE` = T.`TBL_NAME`)
JOIN `sys`.`DBS` D ON (C.`C_DATABASE` = D.`NAME`)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
index 8149494..38f4fec 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
@@ -113,6 +113,14 @@ public class Cleaner extends MetaStoreCompactorThread {
try {
handle =
txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Cleaner.name());
startedAt = System.currentTimeMillis();
+
+ if (metricsEnabled) {
+ stopCycleUpdater();
+ startCycleUpdater(HiveConf.getTimeVar(conf,
+
HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_DURATION_UPDATE_INTERVAL,
TimeUnit.MILLISECONDS),
+ new
CleanerCycleUpdater(MetricsConstants.COMPACTION_CLEANER_CYCLE_DURATION,
startedAt));
+ }
+
long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner();
List<CompactionInfo> readyToClean =
txnHandler.findReadyToClean(minOpenTxnId, retentionTime);
@@ -145,6 +153,10 @@ public class Cleaner extends MetaStoreCompactorThread {
if (handle != null) {
handle.releaseLocks();
}
+ if (metricsEnabled) {
+
updateCycleDurationMetric(MetricsConstants.COMPACTION_INITIATOR_CYCLE_DURATION,
startedAt);
+ }
+ stopCycleUpdater();
}
// Now, go back to bed until it's time to do this again
long elapsedTime = System.currentTimeMillis() - startedAt;
@@ -204,6 +216,9 @@ public class Cleaner extends MetaStoreCompactorThread {
return;
}
}
+
+ txnHandler.markCleanerStart(ci);
+
StorageDescriptor sd = resolveStorageDescriptor(t, p);
final String location = sd.getLocation();
ValidTxnList validTxnList =
@@ -266,6 +281,7 @@ public class Cleaner extends MetaStoreCompactorThread {
if (removedFiles.value || isDynPartAbort(t, ci)) {
txnHandler.markCleaned(ci);
} else {
+ txnHandler.clearCleanerStart(ci);
LOG.warn("No files were removed. Leaving queue entry " + ci + " in
ready for cleaning state.");
}
} catch (Exception e) {
@@ -371,4 +387,19 @@ public class Cleaner extends MetaStoreCompactorThread {
}
return true;
}
+
+ private static class CleanerCycleUpdater implements Runnable {
+ private final String metric;
+ private final long startedAt;
+
+ CleanerCycleUpdater(String metric, long startedAt) {
+ this.metric = metric;
+ this.startedAt = startedAt;
+ }
+
+ @Override
+ public void run() {
+ updateCycleDurationMetric(metric, startedAt);
+ }
+ }
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
index adebd31..bffa773 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
@@ -121,13 +121,24 @@ public class Initiator extends MetaStoreCompactorThread {
// don't doom the entire thread.
try {
handle =
txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Initiator.name());
+ startedAt = System.currentTimeMillis();
+ long compactionInterval = (prevStart < 0) ? prevStart : (startedAt -
prevStart) / 1000;
+ prevStart = startedAt;
+
if (metricsEnabled) {
perfLogger.perfLogBegin(CLASS_NAME,
MetricsConstants.COMPACTION_INITIATOR_CYCLE);
+ stopCycleUpdater();
+ startCycleUpdater(HiveConf.getTimeVar(conf,
+
HiveConf.ConfVars.HIVE_COMPACTOR_INITIATOR_DURATION_UPDATE_INTERVAL,
TimeUnit.MILLISECONDS),
+ new
InitiatorCycleUpdater(MetricsConstants.COMPACTION_INITIATOR_CYCLE_DURATION,
+ startedAt,
+ MetastoreConf.getTimeVar(conf,
+
MetastoreConf.ConfVars.COMPACTOR_LONG_RUNNING_INITIATOR_THRESHOLD_WARNING,
+ TimeUnit.MILLISECONDS),
+ MetastoreConf.getTimeVar(conf,
+
MetastoreConf.ConfVars.COMPACTOR_LONG_RUNNING_INITIATOR_THRESHOLD_ERROR,
+ TimeUnit.MILLISECONDS)));
}
- startedAt = System.currentTimeMillis();
-
- long compactionInterval = (prevStart < 0) ? prevStart : (startedAt -
prevStart)/1000;
- prevStart = startedAt;
final ShowCompactResponse currentCompactions =
txnHandler.showCompact(new ShowCompactRequest());
@@ -191,7 +202,9 @@ public class Initiator extends MetaStoreCompactorThread {
}
if (metricsEnabled) {
perfLogger.perfLogEnd(CLASS_NAME,
MetricsConstants.COMPACTION_INITIATOR_CYCLE);
+
updateCycleDurationMetric(MetricsConstants.COMPACTION_INITIATOR_CYCLE_DURATION,
startedAt);
}
+ stopCycleUpdater();
}
long elapsedTime = System.currentTimeMillis() - startedAt;
@@ -558,4 +571,38 @@ public class Initiator extends MetaStoreCompactorThread {
name.append(threadId);
return name.toString();
}
+
+ private static class InitiatorCycleUpdater implements Runnable {
+ private final String metric;
+ private final long startedAt;
+ private final long warningThreshold;
+ private final long errorThreshold;
+
+ private boolean errorReported;
+ private boolean warningReported;
+
+ InitiatorCycleUpdater(String metric, long startedAt,
+ long warningThreshold, long errorThreshold) {
+ this.metric = metric;
+ this.startedAt = startedAt;
+ this.warningThreshold = warningThreshold;
+ this.errorThreshold = errorThreshold;
+ }
+
+ @Override
+ public void run() {
+ long elapsed = updateCycleDurationMetric(metric, startedAt);
+ if (elapsed >= errorThreshold) {
+ if (!errorReported) {
+ LOG.error("Long running Initiator has been detected, duration {}",
elapsed);
+ errorReported = true;
+ }
+ } else if (elapsed >= warningThreshold) {
+ if (!warningReported && !errorReported) {
+ warningReported = true;
+ LOG.warn("Long running Initiator has been detected, duration {}",
elapsed);
+ }
+ }
+ }
+ }
}
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java
index 7bf1304..4184caf 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java
@@ -17,12 +17,14 @@
*/
package org.apache.hadoop.hive.ql.txn.compactor;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.hive.metastore.MetaStoreThread;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.metrics.Metrics;
import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
@@ -31,6 +33,9 @@ import org.apache.thrift.TException;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.hadoop.hive.metastore.HMSHandler.getMSForConf;
@@ -44,6 +49,7 @@ public class MetaStoreCompactorThread extends CompactorThread
implements MetaSto
protected TxnStore txnHandler;
protected int threadId;
+ protected ScheduledExecutorService cycleUpdaterExecutorService;
@Override
public void setThreadId(int threadId) {
@@ -94,4 +100,37 @@ public class MetaStoreCompactorThread extends
CompactorThread implements MetaSto
throw new MetaException(e.toString());
}
}
+
+ protected void startCycleUpdater(long updateInterval, Runnable taskToRun) {
+ if (cycleUpdaterExecutorService == null) {
+ if (updateInterval > 0) {
+ cycleUpdaterExecutorService =
Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder()
+ .setPriority(Thread.currentThread().getPriority())
+ .setDaemon(true)
+ .setNameFormat("Cycle-Duration-Updater-%d")
+ .build());
+ cycleUpdaterExecutorService.scheduleAtFixedRate(
+ taskToRun,
+ updateInterval, updateInterval, TimeUnit.MILLISECONDS);
+ }
+ }
+ }
+
+ protected void stopCycleUpdater() {
+ if (cycleUpdaterExecutorService != null) {
+ cycleUpdaterExecutorService.shutdownNow();
+ cycleUpdaterExecutorService = null;
+ }
+ }
+
+ protected static long updateCycleDurationMetric(String metric, long
startedAt) {
+ if (startedAt >= 0) {
+ long elapsed = System.currentTimeMillis() - startedAt;
+ LOG.debug("Updating {} metric to {}", metric, elapsed);
+ Metrics.getOrCreateGauge(metric)
+ .set((int)elapsed);
+ return elapsed;
+ }
+ return 0;
+ }
}
diff --git
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionMetrics.java
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionMetrics.java
index 13c97fb..75c722b 100644
---
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionMetrics.java
+++
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionMetrics.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.ql.txn.compactor;
import com.codahale.metrics.Counter;
+import com.google.common.collect.ImmutableList;
import org.apache.hadoop.hive.common.ServerUtils;
import org.apache.hadoop.hive.common.metrics.MetricsTestUtils;
import org.apache.hadoop.hive.common.metrics.common.MetricsFactory;
@@ -434,7 +435,7 @@ public class TestCompactionMetrics extends CompactorTest {
elements.add(generateElement(4,"db", "tb3", "p1", CompactionType.MINOR,
TxnStore.FAILED_RESPONSE));
elements.add(generateElement(6,"db1", "tb", null, CompactionType.MINOR,
TxnStore.FAILED_RESPONSE,
- System.currentTimeMillis(), true, "4.0.0", "4.0.0"));
+ System.currentTimeMillis(), true, "4.0.0", "4.0.0", 10));
elements.add(generateElement(7,"db1", "tb2", null, CompactionType.MINOR,
TxnStore.FAILED_RESPONSE));
elements.add(generateElement(8,"db1", "tb3", null, CompactionType.MINOR,
TxnStore.FAILED_RESPONSE));
@@ -446,12 +447,12 @@ public class TestCompactionMetrics extends CompactorTest
{
elements.add(generateElement(13,"db3", "tb3", null, CompactionType.MINOR,
TxnStore.WORKING_RESPONSE));
// test null initiator version and worker version
elements.add(generateElement(14,"db3", "tb4", null, CompactionType.MINOR,
TxnStore.WORKING_RESPONSE,
- System.currentTimeMillis(), false, null, null));
+ System.currentTimeMillis(), false, null, null,20));
elements.add(generateElement(15,"db3", "tb5", null, CompactionType.MINOR,
TxnStore.WORKING_RESPONSE,
- System.currentTimeMillis(),true, "4.0.0", "4.0.0"));
+ System.currentTimeMillis(),true, "4.0.0", "4.0.0", 30));
elements.add(generateElement(16,"db3", "tb6", null, CompactionType.MINOR,
TxnStore.WORKING_RESPONSE));
elements.add(generateElement(17,"db3", "tb7", null, CompactionType.MINOR,
TxnStore.WORKING_RESPONSE,
- System.currentTimeMillis(),true, "4.0.0", "4.0.0"));
+ System.currentTimeMillis(),true, "4.0.0", "4.0.0",40));
scr.setCompacts(elements);
AcidMetricService.updateMetricsFromShowCompact(scr, conf);
@@ -483,44 +484,91 @@ public class TestCompactionMetrics extends CompactorTest
{
@Test
public void testAgeMetricsNotSet() {
- ShowCompactResponse scr = new ShowCompactResponse();
- List<ShowCompactResponseElement> elements = new ArrayList<>();
- elements.add(generateElement(1, "db", "tb", null, CompactionType.MAJOR,
TxnStore.FAILED_RESPONSE, 1L));
- elements.add(generateElement(5, "db", "tb3", "p1", CompactionType.MINOR,
TxnStore.DID_NOT_INITIATE_RESPONSE, 2L));
- elements.add(generateElement(9, "db2", "tb", null, CompactionType.MINOR,
TxnStore.SUCCEEDED_RESPONSE, 3L));
- elements.add(generateElement(13, "db3", "tb3", null, CompactionType.MINOR,
TxnStore.WORKING_RESPONSE, 4L));
- elements.add(generateElement(14, "db3", "tb4", null, CompactionType.MINOR,
TxnStore.CLEANING_RESPONSE, 5L));
+ List<ShowCompactResponseElement> elements = ImmutableList.of(
+ generateElement(1, "db", "tb", null, CompactionType.MAJOR,
TxnStore.FAILED_RESPONSE, 1L),
+ generateElement(5, "db", "tb3", "p1", CompactionType.MINOR,
TxnStore.DID_NOT_INITIATE_RESPONSE, 2L),
+ generateElement(9, "db2", "tb", null, CompactionType.MINOR,
TxnStore.SUCCEEDED_RESPONSE, 3L)
+ );
+ ShowCompactResponse scr = new ShowCompactResponse();
scr.setCompacts(elements);
AcidMetricService.updateMetricsFromShowCompact(scr, conf);
+
// Check that it is not set
Assert.assertEquals(0,
Metrics.getOrCreateGauge(MetricsConstants.COMPACTION_OLDEST_ENQUEUE_AGE).intValue());
+ Assert.assertEquals(0,
Metrics.getOrCreateGauge(MetricsConstants.COMPACTION_OLDEST_WORKING_AGE).intValue());
+ Assert.assertEquals(0,
Metrics.getOrCreateGauge(MetricsConstants.COMPACTION_OLDEST_CLEANING_AGE).intValue());
}
@Test
- public void testAgeMetricsAge() {
+ public void testInitiatedAgeMetrics() {
ShowCompactResponse scr = new ShowCompactResponse();
- List<ShowCompactResponseElement> elements = new ArrayList<>();
long start = System.currentTimeMillis() - 1000L;
- elements.add(generateElement(15,"db3", "tb5", null, CompactionType.MINOR,
TxnStore.INITIATED_RESPONSE, start));
+ List<ShowCompactResponseElement> elements = ImmutableList.of(
+ generateElement(15, "db3", "tb5", null, CompactionType.MINOR,
TxnStore.INITIATED_RESPONSE, start)
+ );
scr.setCompacts(elements);
AcidMetricService.updateMetricsFromShowCompact(scr, conf);
- long diff = (System.currentTimeMillis() - start)/1000;
+ long diff = (System.currentTimeMillis() - start) / 1000;
+
// Check that we have at least 1s old compaction age, but not more than
expected
-
Assert.assertTrue(Metrics.getOrCreateGauge(MetricsConstants.COMPACTION_OLDEST_ENQUEUE_AGE).intValue()
<= diff);
-
Assert.assertTrue(Metrics.getOrCreateGauge(MetricsConstants.COMPACTION_OLDEST_ENQUEUE_AGE).intValue()
>= 1);
+ int age =
Metrics.getOrCreateGauge(MetricsConstants.COMPACTION_OLDEST_ENQUEUE_AGE).intValue();
+ Assert.assertTrue(age <= diff);
+ Assert.assertTrue(age >= 1);
}
@Test
- public void testAgeMetricsOrder() {
+ public void testWorkingAgeMetrics() {
+ ShowCompactResponse scr = new ShowCompactResponse();
+
+ long start = System.currentTimeMillis() - 1000L;
+ List<ShowCompactResponseElement> elements = ImmutableList.of(
+ generateElement(17, "db3", "tb7", null, CompactionType.MINOR,
TxnStore.WORKING_RESPONSE,
+ System.currentTimeMillis(), true, "4.0.0", "4.0.0", start)
+ );
+
+ scr.setCompacts(elements);
+ AcidMetricService.updateMetricsFromShowCompact(scr, conf);
+ long diff = (System.currentTimeMillis() - start) / 1000;
+
+ // Check that we have at least 1s old compaction age, but not more than
expected
+ int age =
Metrics.getOrCreateGauge(MetricsConstants.COMPACTION_OLDEST_WORKING_AGE).intValue();
+ Assert.assertTrue(age <= diff);
+ Assert.assertTrue(age >= 1);
+ }
+
+ @Test
+ public void testCleaningAgeMetrics() {
+ ShowCompactResponse scr = new ShowCompactResponse();
+
+ long start = System.currentTimeMillis() - 1000L;
+ List<ShowCompactResponseElement> elements = ImmutableList.of(
+ generateElement(19, "db3", "tb7", null, CompactionType.MINOR,
TxnStore.CLEANING_RESPONSE,
+ System.currentTimeMillis(), true, "4.0.0", "4.0.0", -1L, start)
+ );
+
+ scr.setCompacts(elements);
+ AcidMetricService.updateMetricsFromShowCompact(scr, conf);
+ long diff = (System.currentTimeMillis() - start) / 1000;
+
+ // Check that we have at least 1s old compaction age, but not more than
expected
+ int age =
Metrics.getOrCreateGauge(MetricsConstants.COMPACTION_OLDEST_CLEANING_AGE).intValue();
+ Assert.assertTrue(age <= diff);
+ Assert.assertTrue(age >= 1);
+ }
+
+ @Test
+ public void testInitiatedAgeMetricsOrder() {
ShowCompactResponse scr = new ShowCompactResponse();
long start = System.currentTimeMillis();
- List<ShowCompactResponseElement> elements = new ArrayList<>();
- elements.add(generateElement(15,"db3", "tb5", null, CompactionType.MINOR,
TxnStore.INITIATED_RESPONSE,
- start - 1000L));
- elements.add(generateElement(16,"db3", "tb6", null, CompactionType.MINOR,
TxnStore.INITIATED_RESPONSE,
- start - 100000L));
+
+ List<ShowCompactResponseElement> elements = ImmutableList.of(
+ generateElement(15, "db3", "tb5", null, CompactionType.MINOR,
TxnStore.INITIATED_RESPONSE,
+ start - 1_000L),
+ generateElement(16, "db3", "tb6", null, CompactionType.MINOR,
TxnStore.INITIATED_RESPONSE,
+ start - 15_000L)
+ );
scr.setCompacts(elements);
AcidMetricService.updateMetricsFromShowCompact(scr, conf);
@@ -528,14 +576,79 @@ public class TestCompactionMetrics extends CompactorTest
{
Assert.assertTrue(Metrics.getOrCreateGauge(MetricsConstants.COMPACTION_OLDEST_ENQUEUE_AGE).intValue()
> 10);
// Check the reverse order
- elements = new ArrayList<>();
- elements.add(generateElement(16,"db3", "tb6", null, CompactionType.MINOR,
TxnStore.INITIATED_RESPONSE,
- start - 100000L));
- elements.add(generateElement(15,"db3", "tb5", null, CompactionType.MINOR,
TxnStore.INITIATED_RESPONSE,
- start - 1000L));
+ elements = ImmutableList.of(
+ generateElement(16, "db3", "tb6", null, CompactionType.MINOR,
TxnStore.INITIATED_RESPONSE,
+ start - 25_000L),
+ generateElement(15, "db3", "tb5", null, CompactionType.MINOR,
TxnStore.INITIATED_RESPONSE,
+ start - 1_000L)
+ );
+ scr.setCompacts(elements);
+ AcidMetricService.updateMetricsFromShowCompact(scr, conf);
+
+ // Check that the age is older than 20s
+
Assert.assertTrue(Metrics.getOrCreateGauge(MetricsConstants.COMPACTION_OLDEST_ENQUEUE_AGE).intValue()
> 20);
+ }
+
+ @Test
+ public void testWorkingAgeMetricsOrder() {
+ ShowCompactResponse scr = new ShowCompactResponse();
+ long start = System.currentTimeMillis();
+
+ List<ShowCompactResponseElement> elements = ImmutableList.of(
+ generateElement(15, "db3", "tb5", null, CompactionType.MINOR,
TxnStore.WORKING_RESPONSE,
+ start, false, "4.0.0", "4.0.0", start - 1_000L),
+ generateElement(16, "db3", "tb6", null, CompactionType.MINOR,
TxnStore.WORKING_RESPONSE,
+ start, false, "4.0.0", "4.0.0", start - 15_000L)
+ );
+
+ scr.setCompacts(elements);
+ AcidMetricService.updateMetricsFromShowCompact(scr, conf);
+ // Check that the age is older than 10s
+
Assert.assertTrue(Metrics.getOrCreateGauge(MetricsConstants.COMPACTION_OLDEST_WORKING_AGE).intValue()
> 10);
+
+ // Check the reverse order
+ elements = ImmutableList.of(
+ generateElement(16, "db3", "tb6", null, CompactionType.MINOR,
TxnStore.WORKING_RESPONSE,
+ start, false, "4.0.0", "4.0.0", start - 25_000L),
+ generateElement(15, "db3", "tb5", null, CompactionType.MINOR,
TxnStore.WORKING_RESPONSE,
+ start, false, "4.0.0", "4.0.0", start - 1_000L)
+ );
+ scr.setCompacts(elements);
+ AcidMetricService.updateMetricsFromShowCompact(scr, conf);
+
+ // Check that the age is older than 20s
+
Assert.assertTrue(Metrics.getOrCreateGauge(MetricsConstants.COMPACTION_OLDEST_WORKING_AGE).intValue()
> 20);
+ }
+ @Test
+ public void testCleaningAgeMetricsOrder() {
+ ShowCompactResponse scr = new ShowCompactResponse();
+ long start = System.currentTimeMillis();
+
+ List<ShowCompactResponseElement> elements = ImmutableList.of(
+ generateElement(15, "db3", "tb5", null, CompactionType.MINOR,
TxnStore.CLEANING_RESPONSE,
+ start, false, "4.0.0", "4.0.0", -1L, start - 1_000L),
+ generateElement(16, "db3", "tb6", null, CompactionType.MINOR,
TxnStore.CLEANING_RESPONSE,
+ start, false, "4.0.0", "4.0.0", -1L, start - 15_000L)
+ );
+
+ scr.setCompacts(elements);
+ AcidMetricService.updateMetricsFromShowCompact(scr, conf);
// Check that the age is older than 10s
-
Assert.assertTrue(Metrics.getOrCreateGauge(MetricsConstants.COMPACTION_OLDEST_ENQUEUE_AGE).intValue()
> 10);
+
Assert.assertTrue(Metrics.getOrCreateGauge(MetricsConstants.COMPACTION_OLDEST_CLEANING_AGE).intValue()
> 10);
+
+ // Check the reverse order
+ elements = ImmutableList.of(
+ generateElement(16, "db3", "tb6", null, CompactionType.MINOR,
TxnStore.CLEANING_RESPONSE,
+ start, false, "4.0.0", "4.0.0", -1L, start - 25_000L),
+ generateElement(15, "db3", "tb5", null, CompactionType.MINOR,
TxnStore.CLEANING_RESPONSE,
+ start, false, "4.0.0", "4.0.0", -1L, start - 1_000L)
+ );
+ scr.setCompacts(elements);
+ AcidMetricService.updateMetricsFromShowCompact(scr, conf);
+
+ // Check that the age is older than 20s
+
Assert.assertTrue(Metrics.getOrCreateGauge(MetricsConstants.COMPACTION_OLDEST_CLEANING_AGE).intValue()
> 20);
}
@Test
@@ -741,12 +854,19 @@ public class TestCompactionMetrics extends CompactorTest
{
private ShowCompactResponseElement generateElement(long id, String db,
String table, String partition,
CompactionType type, String state, long enqueueTime, boolean
manuallyInitiatedCompaction) {
return generateElement(id, db, table, partition, type, state, enqueueTime,
manuallyInitiatedCompaction,
- null, null);
+ null, null, -1);
+ }
+
+ private ShowCompactResponseElement generateElement(long id, String db,
String table, String partition,
+ CompactionType type, String state, long enqueueTime, boolean
manuallyInitiatedCompaction,
+ String initiatorVersion, String workerVersion, long startTime) {
+ return generateElement(id, db, table, partition, type, state, enqueueTime,
manuallyInitiatedCompaction,
+ initiatorVersion, workerVersion, startTime, -1L);
}
private ShowCompactResponseElement generateElement(long id, String db,
String table, String partition,
CompactionType type, String state, long enqueueTime, boolean
manuallyInitiatedCompaction,
- String initiatorVersion, String workerVersion) {
+ String initiatorVersion, String workerVersion, long startTime, long
cleanerStartTime) {
ShowCompactResponseElement element = new ShowCompactResponseElement(db,
table, type, state);
element.setId(id);
element.setPartitionname(partition);
@@ -764,6 +884,8 @@ public class TestCompactionMetrics extends CompactorTest {
element.setWorkerid(workerId);
element.setInitiatorVersion(initiatorVersion);
element.setWorkerVersion(workerVersion);
+ element.setStart(startTime);
+ element.setCleanerStart(cleanerStartTime);
return element;
}
diff --git a/ql/src/test/results/clientpositive/llap/sysdb.q.out
b/ql/src/test/results/clientpositive/llap/sysdb.q.out
index da1cf94..80b6b49 100644
--- a/ql/src/test/results/clientpositive/llap/sysdb.q.out
+++ b/ql/src/test/results/clientpositive/llap/sysdb.q.out
@@ -465,6 +465,7 @@ columns_v2 column_name
columns_v2 comment
columns_v2 integer_idx
columns_v2 type_name
+compaction_queue cq_cleaner_start
compaction_queue cq_database
compaction_queue cq_enqueue_time
compaction_queue cq_error_message
@@ -484,6 +485,8 @@ compaction_queue cq_worker_id
compaction_queue cq_worker_version
compactions c_catalog
compactions c_catalog
+compactions c_cleaner_start
+compactions c_cleaner_start
compactions c_database
compactions c_database
compactions c_duration
@@ -1552,8 +1555,8 @@ POSTHOOK: Input: sys@compaction_queue
POSTHOOK: Input: sys@compactions
POSTHOOK: Input: sys@completed_compactions
#### A masked pattern was here ####
-1 default default scr_txn NULL major initiated NULL NULL
NULL #Masked# NULL NULL NULL NULL NULL #Masked#
manual 4.0.0-SNAPSHOT
-2 default default scr_txn_2 NULL minor initiated NULL
NULL NULL #Masked# NULL NULL NULL NULL NULL
#Masked# manual 4.0.0-SNAPSHOT
+1 default default scr_txn NULL major initiated NULL NULL
NULL #Masked# NULL NULL NULL NULL NULL #Masked#
manual 4.0.0-SNAPSHOT NULL
+2 default default scr_txn_2 NULL minor initiated NULL
NULL NULL #Masked# NULL NULL NULL NULL NULL
#Masked# manual 4.0.0-SNAPSHOT NULL
PREHOOK: query: use INFORMATION_SCHEMA
PREHOOK: type: SWITCHDATABASE
PREHOOK: Input: database:information_schema
@@ -1781,5 +1784,5 @@ POSTHOOK: Input: sys@dbs
POSTHOOK: Input: sys@tbl_privs
POSTHOOK: Input: sys@tbls
#### A masked pattern was here ####
-1 default default scr_txn NULL major initiated NULL NULL
NULL #Masked# NULL NULL NULL NULL NULL #Masked#
manual 4.0.0-SNAPSHOT
-2 default default scr_txn_2 NULL minor initiated NULL
NULL NULL #Masked# NULL NULL NULL NULL NULL
#Masked# manual 4.0.0-SNAPSHOT
+1 default default scr_txn NULL major initiated NULL NULL
NULL #Masked# NULL NULL NULL NULL NULL #Masked#
manual 4.0.0-SNAPSHOT NULL
+2 default default scr_txn_2 NULL minor initiated NULL
NULL NULL #Masked# NULL NULL NULL NULL NULL
#Masked# manual 4.0.0-SNAPSHOT NULL
diff --git
a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.cpp
b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.cpp
index 0b50045..71e30ab 100644
---
a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.cpp
+++
b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.cpp
@@ -27547,6 +27547,11 @@ void
ShowCompactResponseElement::__set_initiatorVersion(const std::string& val)
this->initiatorVersion = val;
__isset.initiatorVersion = true;
}
+
+void ShowCompactResponseElement::__set_cleanerStart(const int64_t val) {
+ this->cleanerStart = val;
+__isset.cleanerStart = true;
+}
std::ostream& operator<<(std::ostream& out, const ShowCompactResponseElement&
obj)
{
obj.printTo(out);
@@ -27725,6 +27730,14 @@ uint32_t
ShowCompactResponseElement::read(::apache::thrift::protocol::TProtocol*
xfer += iprot->skip(ftype);
}
break;
+ case 19:
+ if (ftype == ::apache::thrift::protocol::T_I64) {
+ xfer += iprot->readI64(this->cleanerStart);
+ this->__isset.cleanerStart = true;
+ } else {
+ xfer += iprot->skip(ftype);
+ }
+ break;
default:
xfer += iprot->skip(ftype);
break;
@@ -27836,6 +27849,11 @@ uint32_t
ShowCompactResponseElement::write(::apache::thrift::protocol::TProtocol
xfer += oprot->writeString(this->initiatorVersion);
xfer += oprot->writeFieldEnd();
}
+ if (this->__isset.cleanerStart) {
+ xfer += oprot->writeFieldBegin("cleanerStart",
::apache::thrift::protocol::T_I64, 19);
+ xfer += oprot->writeI64(this->cleanerStart);
+ xfer += oprot->writeFieldEnd();
+ }
xfer += oprot->writeFieldStop();
xfer += oprot->writeStructEnd();
return xfer;
@@ -27861,6 +27879,7 @@ void swap(ShowCompactResponseElement &a,
ShowCompactResponseElement &b) {
swap(a.workerVersion, b.workerVersion);
swap(a.initiatorId, b.initiatorId);
swap(a.initiatorVersion, b.initiatorVersion);
+ swap(a.cleanerStart, b.cleanerStart);
swap(a.__isset, b.__isset);
}
@@ -27883,6 +27902,7 @@
ShowCompactResponseElement::ShowCompactResponseElement(const ShowCompactResponse
workerVersion = other984.workerVersion;
initiatorId = other984.initiatorId;
initiatorVersion = other984.initiatorVersion;
+ cleanerStart = other984.cleanerStart;
__isset = other984.__isset;
}
ShowCompactResponseElement& ShowCompactResponseElement::operator=(const
ShowCompactResponseElement& other985) {
@@ -27904,6 +27924,7 @@ ShowCompactResponseElement&
ShowCompactResponseElement::operator=(const ShowComp
workerVersion = other985.workerVersion;
initiatorId = other985.initiatorId;
initiatorVersion = other985.initiatorVersion;
+ cleanerStart = other985.cleanerStart;
__isset = other985.__isset;
return *this;
}
@@ -27928,6 +27949,7 @@ void ShowCompactResponseElement::printTo(std::ostream&
out) const {
out << ", " << "workerVersion="; (__isset.workerVersion ? (out <<
to_string(workerVersion)) : (out << "<null>"));
out << ", " << "initiatorId="; (__isset.initiatorId ? (out <<
to_string(initiatorId)) : (out << "<null>"));
out << ", " << "initiatorVersion="; (__isset.initiatorVersion ? (out <<
to_string(initiatorVersion)) : (out << "<null>"));
+ out << ", " << "cleanerStart="; (__isset.cleanerStart ? (out <<
to_string(cleanerStart)) : (out << "<null>"));
out << ")";
}
diff --git
a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.h
b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.h
index 3e5eb35..1d4d544 100644
---
a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.h
+++
b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.h
@@ -10407,7 +10407,7 @@ void swap(ShowCompactRequest &a, ShowCompactRequest &b);
std::ostream& operator<<(std::ostream& out, const ShowCompactRequest& obj);
typedef struct _ShowCompactResponseElement__isset {
- _ShowCompactResponseElement__isset() : partitionname(false),
workerid(false), start(false), runAs(false), hightestTxnId(false),
metaInfo(false), endTime(false), hadoopJobId(true), id(false),
errorMessage(false), enqueueTime(false), workerVersion(false),
initiatorId(false), initiatorVersion(false) {}
+ _ShowCompactResponseElement__isset() : partitionname(false),
workerid(false), start(false), runAs(false), hightestTxnId(false),
metaInfo(false), endTime(false), hadoopJobId(true), id(false),
errorMessage(false), enqueueTime(false), workerVersion(false),
initiatorId(false), initiatorVersion(false), cleanerStart(false) {}
bool partitionname :1;
bool workerid :1;
bool start :1;
@@ -10422,6 +10422,7 @@ typedef struct _ShowCompactResponseElement__isset {
bool workerVersion :1;
bool initiatorId :1;
bool initiatorVersion :1;
+ bool cleanerStart :1;
} _ShowCompactResponseElement__isset;
class ShowCompactResponseElement : public virtual ::apache::thrift::TBase {
@@ -10429,7 +10430,7 @@ class ShowCompactResponseElement : public virtual
::apache::thrift::TBase {
ShowCompactResponseElement(const ShowCompactResponseElement&);
ShowCompactResponseElement& operator=(const ShowCompactResponseElement&);
- ShowCompactResponseElement() : dbname(), tablename(), partitionname(),
type((CompactionType::type)0), state(), workerid(), start(0), runAs(),
hightestTxnId(0), metaInfo(), endTime(0), hadoopJobId("None"), id(0),
errorMessage(), enqueueTime(0), workerVersion(), initiatorId(),
initiatorVersion() {
+ ShowCompactResponseElement() : dbname(), tablename(), partitionname(),
type((CompactionType::type)0), state(), workerid(), start(0), runAs(),
hightestTxnId(0), metaInfo(), endTime(0), hadoopJobId("None"), id(0),
errorMessage(), enqueueTime(0), workerVersion(), initiatorId(),
initiatorVersion(), cleanerStart(0) {
}
virtual ~ShowCompactResponseElement() noexcept;
@@ -10455,6 +10456,7 @@ class ShowCompactResponseElement : public virtual
::apache::thrift::TBase {
std::string workerVersion;
std::string initiatorId;
std::string initiatorVersion;
+ int64_t cleanerStart;
_ShowCompactResponseElement__isset __isset;
@@ -10494,6 +10496,8 @@ class ShowCompactResponseElement : public virtual
::apache::thrift::TBase {
void __set_initiatorVersion(const std::string& val);
+ void __set_cleanerStart(const int64_t val);
+
bool operator == (const ShowCompactResponseElement & rhs) const
{
if (!(dbname == rhs.dbname))
@@ -10560,6 +10564,10 @@ class ShowCompactResponseElement : public virtual
::apache::thrift::TBase {
return false;
else if (__isset.initiatorVersion && !(initiatorVersion ==
rhs.initiatorVersion))
return false;
+ if (__isset.cleanerStart != rhs.__isset.cleanerStart)
+ return false;
+ else if (__isset.cleanerStart && !(cleanerStart == rhs.cleanerStart))
+ return false;
return true;
}
bool operator != (const ShowCompactResponseElement &rhs) const {
diff --git
a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/ShowCompactResponseElement.java
b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/ShowCompactResponseElement.java
index dff37fe..d0245ea 100644
---
a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/ShowCompactResponseElement.java
+++
b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/ShowCompactResponseElement.java
@@ -29,6 +29,7 @@ package org.apache.hadoop.hive.metastore.api;
private static final org.apache.thrift.protocol.TField
WORKER_VERSION_FIELD_DESC = new
org.apache.thrift.protocol.TField("workerVersion",
org.apache.thrift.protocol.TType.STRING, (short)16);
private static final org.apache.thrift.protocol.TField
INITIATOR_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("initiatorId",
org.apache.thrift.protocol.TType.STRING, (short)17);
private static final org.apache.thrift.protocol.TField
INITIATOR_VERSION_FIELD_DESC = new
org.apache.thrift.protocol.TField("initiatorVersion",
org.apache.thrift.protocol.TType.STRING, (short)18);
+ private static final org.apache.thrift.protocol.TField
CLEANER_START_FIELD_DESC = new
org.apache.thrift.protocol.TField("cleanerStart",
org.apache.thrift.protocol.TType.I64, (short)19);
private static final org.apache.thrift.scheme.SchemeFactory
STANDARD_SCHEME_FACTORY = new ShowCompactResponseElementStandardSchemeFactory();
private static final org.apache.thrift.scheme.SchemeFactory
TUPLE_SCHEME_FACTORY = new ShowCompactResponseElementTupleSchemeFactory();
@@ -51,6 +52,7 @@ package org.apache.hadoop.hive.metastore.api;
private @org.apache.thrift.annotation.Nullable java.lang.String
workerVersion; // optional
private @org.apache.thrift.annotation.Nullable java.lang.String initiatorId;
// optional
private @org.apache.thrift.annotation.Nullable java.lang.String
initiatorVersion; // optional
+ private long cleanerStart; // optional
/** The set of fields this struct contains, along with convenience methods
for finding and manipulating them. */
public enum _Fields implements org.apache.thrift.TFieldIdEnum {
@@ -75,7 +77,8 @@ package org.apache.hadoop.hive.metastore.api;
ENQUEUE_TIME((short)15, "enqueueTime"),
WORKER_VERSION((short)16, "workerVersion"),
INITIATOR_ID((short)17, "initiatorId"),
- INITIATOR_VERSION((short)18, "initiatorVersion");
+ INITIATOR_VERSION((short)18, "initiatorVersion"),
+ CLEANER_START((short)19, "cleanerStart");
private static final java.util.Map<java.lang.String, _Fields> byName = new
java.util.HashMap<java.lang.String, _Fields>();
@@ -127,6 +130,8 @@ package org.apache.hadoop.hive.metastore.api;
return INITIATOR_ID;
case 18: // INITIATOR_VERSION
return INITIATOR_VERSION;
+ case 19: // CLEANER_START
+ return CLEANER_START;
default:
return null;
}
@@ -173,8 +178,9 @@ package org.apache.hadoop.hive.metastore.api;
private static final int __ENDTIME_ISSET_ID = 2;
private static final int __ID_ISSET_ID = 3;
private static final int __ENQUEUETIME_ISSET_ID = 4;
+ private static final int __CLEANERSTART_ISSET_ID = 5;
private byte __isset_bitfield = 0;
- private static final _Fields optionals[] =
{_Fields.PARTITIONNAME,_Fields.WORKERID,_Fields.START,_Fields.RUN_AS,_Fields.HIGHTEST_TXN_ID,_Fields.META_INFO,_Fields.END_TIME,_Fields.HADOOP_JOB_ID,_Fields.ID,_Fields.ERROR_MESSAGE,_Fields.ENQUEUE_TIME,_Fields.WORKER_VERSION,_Fields.INITIATOR_ID,_Fields.INITIATOR_VERSION};
+ private static final _Fields optionals[] =
{_Fields.PARTITIONNAME,_Fields.WORKERID,_Fields.START,_Fields.RUN_AS,_Fields.HIGHTEST_TXN_ID,_Fields.META_INFO,_Fields.END_TIME,_Fields.HADOOP_JOB_ID,_Fields.ID,_Fields.ERROR_MESSAGE,_Fields.ENQUEUE_TIME,_Fields.WORKER_VERSION,_Fields.INITIATOR_ID,_Fields.INITIATOR_VERSION,_Fields.CLEANER_START};
public static final java.util.Map<_Fields,
org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
static {
java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap =
new java.util.EnumMap<_Fields,
org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
@@ -214,6 +220,8 @@ package org.apache.hadoop.hive.metastore.api;
new
org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
tmpMap.put(_Fields.INITIATOR_VERSION, new
org.apache.thrift.meta_data.FieldMetaData("initiatorVersion",
org.apache.thrift.TFieldRequirementType.OPTIONAL,
new
org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+ tmpMap.put(_Fields.CLEANER_START, new
org.apache.thrift.meta_data.FieldMetaData("cleanerStart",
org.apache.thrift.TFieldRequirementType.OPTIONAL,
+ new
org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64)));
metaDataMap = java.util.Collections.unmodifiableMap(tmpMap);
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(ShowCompactResponseElement.class,
metaDataMap);
}
@@ -285,6 +293,7 @@ package org.apache.hadoop.hive.metastore.api;
if (other.isSetInitiatorVersion()) {
this.initiatorVersion = other.initiatorVersion;
}
+ this.cleanerStart = other.cleanerStart;
}
public ShowCompactResponseElement deepCopy() {
@@ -317,6 +326,8 @@ package org.apache.hadoop.hive.metastore.api;
this.workerVersion = null;
this.initiatorId = null;
this.initiatorVersion = null;
+ setCleanerStartIsSet(false);
+ this.cleanerStart = 0;
}
@org.apache.thrift.annotation.Nullable
@@ -749,6 +760,28 @@ package org.apache.hadoop.hive.metastore.api;
}
}
+ public long getCleanerStart() {
+ return this.cleanerStart;
+ }
+
+ public void setCleanerStart(long cleanerStart) {
+ this.cleanerStart = cleanerStart;
+ setCleanerStartIsSet(true);
+ }
+
+ public void unsetCleanerStart() {
+ __isset_bitfield =
org.apache.thrift.EncodingUtils.clearBit(__isset_bitfield,
__CLEANERSTART_ISSET_ID);
+ }
+
+ /** Returns true if field cleanerStart is set (has been assigned a value)
and false otherwise */
+ public boolean isSetCleanerStart() {
+ return org.apache.thrift.EncodingUtils.testBit(__isset_bitfield,
__CLEANERSTART_ISSET_ID);
+ }
+
+ public void setCleanerStartIsSet(boolean value) {
+ __isset_bitfield =
org.apache.thrift.EncodingUtils.setBit(__isset_bitfield,
__CLEANERSTART_ISSET_ID, value);
+ }
+
public void setFieldValue(_Fields field,
@org.apache.thrift.annotation.Nullable java.lang.Object value) {
switch (field) {
case DBNAME:
@@ -895,6 +928,14 @@ package org.apache.hadoop.hive.metastore.api;
}
break;
+ case CLEANER_START:
+ if (value == null) {
+ unsetCleanerStart();
+ } else {
+ setCleanerStart((java.lang.Long)value);
+ }
+ break;
+
}
}
@@ -955,6 +996,9 @@ package org.apache.hadoop.hive.metastore.api;
case INITIATOR_VERSION:
return getInitiatorVersion();
+ case CLEANER_START:
+ return getCleanerStart();
+
}
throw new java.lang.IllegalStateException();
}
@@ -1002,6 +1046,8 @@ package org.apache.hadoop.hive.metastore.api;
return isSetInitiatorId();
case INITIATOR_VERSION:
return isSetInitiatorVersion();
+ case CLEANER_START:
+ return isSetCleanerStart();
}
throw new java.lang.IllegalStateException();
}
@@ -1181,6 +1227,15 @@ package org.apache.hadoop.hive.metastore.api;
return false;
}
+ boolean this_present_cleanerStart = true && this.isSetCleanerStart();
+ boolean that_present_cleanerStart = true && that.isSetCleanerStart();
+ if (this_present_cleanerStart || that_present_cleanerStart) {
+ if (!(this_present_cleanerStart && that_present_cleanerStart))
+ return false;
+ if (this.cleanerStart != that.cleanerStart)
+ return false;
+ }
+
return true;
}
@@ -1260,6 +1315,10 @@ package org.apache.hadoop.hive.metastore.api;
if (isSetInitiatorVersion())
hashCode = hashCode * 8191 + initiatorVersion.hashCode();
+ hashCode = hashCode * 8191 + ((isSetCleanerStart()) ? 131071 : 524287);
+ if (isSetCleanerStart())
+ hashCode = hashCode * 8191 +
org.apache.thrift.TBaseHelper.hashCode(cleanerStart);
+
return hashCode;
}
@@ -1451,6 +1510,16 @@ package org.apache.hadoop.hive.metastore.api;
return lastComparison;
}
}
+ lastComparison = java.lang.Boolean.compare(isSetCleanerStart(),
other.isSetCleanerStart());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetCleanerStart()) {
+ lastComparison =
org.apache.thrift.TBaseHelper.compareTo(this.cleanerStart, other.cleanerStart);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
return 0;
}
@@ -1623,6 +1692,12 @@ package org.apache.hadoop.hive.metastore.api;
}
first = false;
}
+ if (isSetCleanerStart()) {
+ if (!first) sb.append(", ");
+ sb.append("cleanerStart:");
+ sb.append(this.cleanerStart);
+ first = false;
+ }
sb.append(")");
return sb.toString();
}
@@ -1828,6 +1903,14 @@ package org.apache.hadoop.hive.metastore.api;
org.apache.thrift.protocol.TProtocolUtil.skip(iprot,
schemeField.type);
}
break;
+ case 19: // CLEANER_START
+ if (schemeField.type == org.apache.thrift.protocol.TType.I64) {
+ struct.cleanerStart = iprot.readI64();
+ struct.setCleanerStartIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot,
schemeField.type);
+ }
+ break;
default:
org.apache.thrift.protocol.TProtocolUtil.skip(iprot,
schemeField.type);
}
@@ -1949,6 +2032,11 @@ package org.apache.hadoop.hive.metastore.api;
oprot.writeFieldEnd();
}
}
+ if (struct.isSetCleanerStart()) {
+ oprot.writeFieldBegin(CLEANER_START_FIELD_DESC);
+ oprot.writeI64(struct.cleanerStart);
+ oprot.writeFieldEnd();
+ }
oprot.writeFieldStop();
oprot.writeStructEnd();
}
@@ -2013,7 +2101,10 @@ package org.apache.hadoop.hive.metastore.api;
if (struct.isSetInitiatorVersion()) {
optionals.set(13);
}
- oprot.writeBitSet(optionals, 14);
+ if (struct.isSetCleanerStart()) {
+ optionals.set(14);
+ }
+ oprot.writeBitSet(optionals, 15);
if (struct.isSetPartitionname()) {
oprot.writeString(struct.partitionname);
}
@@ -2056,6 +2147,9 @@ package org.apache.hadoop.hive.metastore.api;
if (struct.isSetInitiatorVersion()) {
oprot.writeString(struct.initiatorVersion);
}
+ if (struct.isSetCleanerStart()) {
+ oprot.writeI64(struct.cleanerStart);
+ }
}
@Override
@@ -2069,7 +2163,7 @@ package org.apache.hadoop.hive.metastore.api;
struct.setTypeIsSet(true);
struct.state = iprot.readString();
struct.setStateIsSet(true);
- java.util.BitSet incoming = iprot.readBitSet(14);
+ java.util.BitSet incoming = iprot.readBitSet(15);
if (incoming.get(0)) {
struct.partitionname = iprot.readString();
struct.setPartitionnameIsSet(true);
@@ -2126,6 +2220,10 @@ package org.apache.hadoop.hive.metastore.api;
struct.initiatorVersion = iprot.readString();
struct.setInitiatorVersionIsSet(true);
}
+ if (incoming.get(14)) {
+ struct.cleanerStart = iprot.readI64();
+ struct.setCleanerStartIsSet(true);
+ }
}
}
diff --git
a/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/ShowCompactResponseElement.php
b/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/ShowCompactResponseElement.php
index a05ccf5..a66b43f 100644
---
a/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/ShowCompactResponseElement.php
+++
b/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/ShowCompactResponseElement.php
@@ -112,6 +112,11 @@ class ShowCompactResponseElement
'isRequired' => false,
'type' => TType::STRING,
),
+ 19 => array(
+ 'var' => 'cleanerStart',
+ 'isRequired' => false,
+ 'type' => TType::I64,
+ ),
);
/**
@@ -186,6 +191,10 @@ class ShowCompactResponseElement
* @var string
*/
public $initiatorVersion = null;
+ /**
+ * @var int
+ */
+ public $cleanerStart = null;
public function __construct($vals = null)
{
@@ -244,6 +253,9 @@ class ShowCompactResponseElement
if (isset($vals['initiatorVersion'])) {
$this->initiatorVersion = $vals['initiatorVersion'];
}
+ if (isset($vals['cleanerStart'])) {
+ $this->cleanerStart = $vals['cleanerStart'];
+ }
}
}
@@ -392,6 +404,13 @@ class ShowCompactResponseElement
$xfer += $input->skip($ftype);
}
break;
+ case 19:
+ if ($ftype == TType::I64) {
+ $xfer += $input->readI64($this->cleanerStart);
+ } else {
+ $xfer += $input->skip($ftype);
+ }
+ break;
default:
$xfer += $input->skip($ftype);
break;
@@ -496,6 +515,11 @@ class ShowCompactResponseElement
$xfer += $output->writeString($this->initiatorVersion);
$xfer += $output->writeFieldEnd();
}
+ if ($this->cleanerStart !== null) {
+ $xfer += $output->writeFieldBegin('cleanerStart', TType::I64, 19);
+ $xfer += $output->writeI64($this->cleanerStart);
+ $xfer += $output->writeFieldEnd();
+ }
$xfer += $output->writeFieldStop();
$xfer += $output->writeStructEnd();
return $xfer;
diff --git
a/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py
b/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py
index d4a0917..f80294b 100644
---
a/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py
+++
b/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py
@@ -15751,11 +15751,12 @@ class ShowCompactResponseElement(object):
- workerVersion
- initiatorId
- initiatorVersion
+ - cleanerStart
"""
- def __init__(self, dbname=None, tablename=None, partitionname=None,
type=None, state=None, workerid=None, start=None, runAs=None,
hightestTxnId=None, metaInfo=None, endTime=None, hadoopJobId="None", id=None,
errorMessage=None, enqueueTime=None, workerVersion=None, initiatorId=None,
initiatorVersion=None,):
+ def __init__(self, dbname=None, tablename=None, partitionname=None,
type=None, state=None, workerid=None, start=None, runAs=None,
hightestTxnId=None, metaInfo=None, endTime=None, hadoopJobId="None", id=None,
errorMessage=None, enqueueTime=None, workerVersion=None, initiatorId=None,
initiatorVersion=None, cleanerStart=None,):
self.dbname = dbname
self.tablename = tablename
self.partitionname = partitionname
@@ -15774,6 +15775,7 @@ class ShowCompactResponseElement(object):
self.workerVersion = workerVersion
self.initiatorId = initiatorId
self.initiatorVersion = initiatorVersion
+ self.cleanerStart = cleanerStart
def read(self, iprot):
if iprot._fast_decode is not None and isinstance(iprot.trans,
TTransport.CReadableTransport) and self.thrift_spec is not None:
@@ -15874,6 +15876,11 @@ class ShowCompactResponseElement(object):
self.initiatorVersion = iprot.readString().decode('utf-8',
errors='replace') if sys.version_info[0] == 2 else iprot.readString()
else:
iprot.skip(ftype)
+ elif fid == 19:
+ if ftype == TType.I64:
+ self.cleanerStart = iprot.readI64()
+ else:
+ iprot.skip(ftype)
else:
iprot.skip(ftype)
iprot.readFieldEnd()
@@ -15956,6 +15963,10 @@ class ShowCompactResponseElement(object):
oprot.writeFieldBegin('initiatorVersion', TType.STRING, 18)
oprot.writeString(self.initiatorVersion.encode('utf-8') if
sys.version_info[0] == 2 else self.initiatorVersion)
oprot.writeFieldEnd()
+ if self.cleanerStart is not None:
+ oprot.writeFieldBegin('cleanerStart', TType.I64, 19)
+ oprot.writeI64(self.cleanerStart)
+ oprot.writeFieldEnd()
oprot.writeFieldStop()
oprot.writeStructEnd()
@@ -30425,6 +30436,7 @@ ShowCompactResponseElement.thrift_spec = (
(16, TType.STRING, 'workerVersion', 'UTF8', None, ), # 16
(17, TType.STRING, 'initiatorId', 'UTF8', None, ), # 17
(18, TType.STRING, 'initiatorVersion', 'UTF8', None, ), # 18
+ (19, TType.I64, 'cleanerStart', None, None, ), # 19
)
all_structs.append(ShowCompactResponse)
ShowCompactResponse.thrift_spec = (
diff --git
a/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb
b/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb
index f08e16f..779898b 100644
---
a/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb
+++
b/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb
@@ -4578,6 +4578,7 @@ class ShowCompactResponseElement
WORKERVERSION = 16
INITIATORID = 17
INITIATORVERSION = 18
+ CLEANERSTART = 19
FIELDS = {
DBNAME => {:type => ::Thrift::Types::STRING, :name => 'dbname'},
@@ -4597,7 +4598,8 @@ class ShowCompactResponseElement
ENQUEUETIME => {:type => ::Thrift::Types::I64, :name => 'enqueueTime',
:optional => true},
WORKERVERSION => {:type => ::Thrift::Types::STRING, :name =>
'workerVersion', :optional => true},
INITIATORID => {:type => ::Thrift::Types::STRING, :name => 'initiatorId',
:optional => true},
- INITIATORVERSION => {:type => ::Thrift::Types::STRING, :name =>
'initiatorVersion', :optional => true}
+ INITIATORVERSION => {:type => ::Thrift::Types::STRING, :name =>
'initiatorVersion', :optional => true},
+ CLEANERSTART => {:type => ::Thrift::Types::I64, :name => 'cleanerStart',
:optional => true}
}
def struct_fields; FIELDS; end
diff --git
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
index 448ea6a..2feff8c 100644
---
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
+++
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
@@ -494,6 +494,18 @@ public class MetastoreConf {
12, TimeUnit.HOURS,
"Age of oldest initiated compaction in the compaction queue after
which an error will be logged. " +
"Default time unit is: hours"),
+ COMPACTOR_LONG_RUNNING_INITIATOR_THRESHOLD_WARNING(
+ "metastore.compactor.long.running.initiator.threshold.warning",
+ "hive.compactor.long.running.initiator.threshold.warning",
+ 6, TimeUnit.HOURS,
+ "Initiator cycle duration after which a warning will be logged. " +
+ "Default time unit is: hours"),
+ COMPACTOR_LONG_RUNNING_INITIATOR_THRESHOLD_ERROR(
+ "metastore.compactor.long.running.initiator.threshold.error",
+ "hive.compactor.long.running.initiator.threshold.error",
+ 12, TimeUnit.HOURS,
+ "Initiator cycle duration after which an error will be logged. " +
+ "Default time unit is: hours"),
COMPACTOR_COMPLETED_TXN_COMPONENTS_RECORD_THRESHOLD_WARNING(
"metastore.compactor.completed.txn.components.record.threshold.warning",
"hive.compactor.completed.txn.components.record.threshold.warning",
diff --git
a/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
b/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
index c33491e..7241cc6 100644
---
a/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
+++
b/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
@@ -1327,7 +1327,8 @@ struct ShowCompactResponseElement {
15: optional i64 enqueueTime,
16: optional string workerVersion,
17: optional string initiatorId,
- 18: optional string initiatorVersion
+ 18: optional string initiatorVersion,
+ 19: optional i64 cleanerStart
}
struct ShowCompactResponse {
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/metrics/AcidMetricService.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/metrics/AcidMetricService.java
index 7870c51..ec8208c 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/metrics/AcidMetricService.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/metrics/AcidMetricService.java
@@ -45,7 +45,9 @@ import static
org.apache.hadoop.hive.metastore.metrics.MetricsConstants.COMPACTI
import static
org.apache.hadoop.hive.metastore.metrics.MetricsConstants.COMPACTION_NUM_INITIATOR_VERSIONS;
import static
org.apache.hadoop.hive.metastore.metrics.MetricsConstants.COMPACTION_NUM_WORKERS;
import static
org.apache.hadoop.hive.metastore.metrics.MetricsConstants.COMPACTION_NUM_WORKER_VERSIONS;
+import static
org.apache.hadoop.hive.metastore.metrics.MetricsConstants.COMPACTION_OLDEST_CLEANING_AGE;
import static
org.apache.hadoop.hive.metastore.metrics.MetricsConstants.COMPACTION_OLDEST_ENQUEUE_AGE;
+import static
org.apache.hadoop.hive.metastore.metrics.MetricsConstants.COMPACTION_OLDEST_WORKING_AGE;
import static
org.apache.hadoop.hive.metastore.metrics.MetricsConstants.COMPACTION_STATUS_PREFIX;
import static
org.apache.hadoop.hive.metastore.metrics.MetricsConstants.NUM_ABORTED_TXNS;
import static
org.apache.hadoop.hive.metastore.metrics.MetricsConstants.NUM_COMPLETED_TXN_COMPONENTS;
@@ -268,16 +270,34 @@ public class AcidMetricService implements
MetastoreTaskThread {
public static void updateMetricsFromShowCompact(ShowCompactResponse
showCompactResponse, Configuration conf) {
Map<String, ShowCompactResponseElement> lastElements = new HashMap<>();
long oldestEnqueueTime = Long.MAX_VALUE;
+ long oldestWorkingTime = Long.MAX_VALUE;
+ long oldestCleaningTime = Long.MAX_VALUE;
// Get the last compaction for each db/table/partition
for(ShowCompactResponseElement element :
showCompactResponse.getCompacts()) {
String key = element.getDbname() + "/" + element.getTablename() +
(element.getPartitionname() != null ? "/" +
element.getPartitionname() : "");
+
// If new key, add the element, if there is an existing one, change to
the element if the element.id is greater than old.id
lastElements.compute(key, (k, old) -> (old == null) ? element :
(element.getId() > old.getId() ? element : old));
- if (TxnStore.INITIATED_RESPONSE.equals(element.getState()) &&
oldestEnqueueTime > element.getEnqueueTime()) {
+
+ // find the oldest elements with initiated and working states
+ String state = element.getState();
+ if (TxnStore.INITIATED_RESPONSE.equals(state) && (oldestEnqueueTime >
element.getEnqueueTime())) {
oldestEnqueueTime = element.getEnqueueTime();
}
+
+ if (element.isSetStart()) {
+ if (TxnStore.WORKING_RESPONSE.equals(state) && (oldestWorkingTime >
element.getStart())) {
+ oldestWorkingTime = element.getStart();
+ }
+ }
+
+ if (element.isSetCleanerStart()) {
+ if (TxnStore.CLEANING_RESPONSE.equals(state) && (oldestCleaningTime >
element.getCleanerStart())) {
+ oldestCleaningTime = element.getCleanerStart();
+ }
+ }
}
// Get the current count for each state
@@ -304,24 +324,13 @@ public class AcidMetricService implements
MetastoreTaskThread {
LOG.warn("Many compactions are failing. Check root cause of failed/not
initiated compactions.");
}
- if (oldestEnqueueTime == Long.MAX_VALUE) {
- Metrics.getOrCreateGauge(COMPACTION_OLDEST_ENQUEUE_AGE).set(0);
- } else {
- int oldestEnqueueAge = (int) ((System.currentTimeMillis() -
oldestEnqueueTime) / 1000L);
- Metrics.getOrCreateGauge(COMPACTION_OLDEST_ENQUEUE_AGE)
- .set(oldestEnqueueAge);
- if (oldestEnqueueAge >= MetastoreConf.getTimeVar(conf,
-
MetastoreConf.ConfVars.COMPACTOR_OLDEST_INITIATED_COMPACTION_TIME_THRESHOLD_WARNING,
TimeUnit.SECONDS) &&
- oldestEnqueueAge < MetastoreConf.getTimeVar(conf,
-
MetastoreConf.ConfVars.COMPACTOR_OLDEST_INITIATED_COMPACTION_TIME_THRESHOLD_ERROR,
TimeUnit.SECONDS)) {
- LOG.warn("Found compaction entry in compaction queue with an age of "
+ oldestEnqueueAge + " seconds. " +
- "Consider increasing the number of worker threads.");
- } else if (oldestEnqueueAge >= MetastoreConf.getTimeVar(conf,
-
MetastoreConf.ConfVars.COMPACTOR_OLDEST_INITIATED_COMPACTION_TIME_THRESHOLD_ERROR,
TimeUnit.SECONDS)) {
- LOG.error("Found compaction entry in compaction queue with an age of "
+ oldestEnqueueAge + " seconds. " +
- "Consider increasing the number of worker threads");
- }
- }
+ updateOldestCompactionMetric(COMPACTION_OLDEST_ENQUEUE_AGE,
oldestEnqueueTime, conf,
+ "Found compaction entry in compaction queue with an age of {} seconds.
" +
+ "Consider increasing the number of worker threads.",
+
MetastoreConf.ConfVars.COMPACTOR_OLDEST_INITIATED_COMPACTION_TIME_THRESHOLD_WARNING,
+
MetastoreConf.ConfVars.COMPACTOR_OLDEST_INITIATED_COMPACTION_TIME_THRESHOLD_ERROR);
+ updateOldestCompactionMetric(COMPACTION_OLDEST_WORKING_AGE,
oldestWorkingTime, conf);
+ updateOldestCompactionMetric(COMPACTION_OLDEST_CLEANING_AGE,
oldestCleaningTime, conf);
long initiatorsCount = lastElements.values().stream()
//manually initiated compactions don't count
@@ -340,6 +349,30 @@ public class AcidMetricService implements
MetastoreTaskThread {
Metrics.getOrCreateGauge(COMPACTION_NUM_WORKER_VERSIONS).set((int)
workerVersionsCount);
}
+ private static void updateOldestCompactionMetric(String metricName, long
oldestTime, Configuration conf) {
+ updateOldestCompactionMetric(metricName, oldestTime, conf, null, null,
null);
+ }
+
+ private static void updateOldestCompactionMetric(String metricName, long
oldestTime, Configuration conf,
+ String logMessage, MetastoreConf.ConfVars warningThreshold,
MetastoreConf.ConfVars errorThreshold) {
+ if (oldestTime == Long.MAX_VALUE) {
+ Metrics.getOrCreateGauge(metricName)
+ .set(0);
+ return;
+ }
+
+ int oldestAge = (int) ((System.currentTimeMillis() - oldestTime) / 1000L);
+ Metrics.getOrCreateGauge(metricName)
+ .set(oldestAge);
+ if (logMessage != null) {
+ if (oldestAge >= MetastoreConf.getTimeVar(conf, errorThreshold,
TimeUnit.SECONDS)) {
+ LOG.error(logMessage, oldestAge);
+ } else if (oldestAge >= MetastoreConf.getTimeVar(conf, warningThreshold,
TimeUnit.SECONDS)) {
+ LOG.warn(logMessage, oldestAge);
+ }
+ }
+ }
+
@Override
public void setConf(Configuration configuration) {
this.conf = configuration;
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/metrics/MetricsConstants.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/metrics/MetricsConstants.java
index 6676684..710f0f9 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/metrics/MetricsConstants.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/metrics/MetricsConstants.java
@@ -22,9 +22,13 @@ public class MetricsConstants {
public static final String API_PREFIX = "api_";
public static final String COMPACTION_STATUS_PREFIX = "compaction_num_";
public static final String COMPACTION_OLDEST_ENQUEUE_AGE =
"compaction_oldest_enqueue_age_in_sec";
+ public static final String COMPACTION_OLDEST_WORKING_AGE =
"compaction_oldest_working_age_in_sec";
+ public static final String COMPACTION_OLDEST_CLEANING_AGE =
"compaction_oldest_cleaning_age_in_sec";
public static final String COMPACTION_INITIATOR_CYCLE =
"compaction_initiator_cycle";
+ public static final String COMPACTION_INITIATOR_CYCLE_DURATION =
"compaction_initiator_cycle_duration";
public static final String COMPACTION_INITIATOR_FAILURE_COUNTER =
"compaction_initiator_failure_counter";
public static final String COMPACTION_CLEANER_CYCLE =
"compaction_cleaner_cycle";
+ public static final String COMPACTION_CLEANER_CYCLE_DURATION =
"compaction_cleaner_cycle_duration";
public static final String COMPACTION_CLEANER_FAILURE_COUNTER =
"compaction_cleaner_failure_counter";
public static final String COMPACTION_WORKER_CYCLE =
"compaction_worker_cycle";
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
index 4a9a6ef..9cece8a 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
@@ -325,9 +325,9 @@ class CompactionTxnHandler extends TxnHandler {
public List<CompactionInfo> findReadyToClean(long minOpenTxnWaterMark, long
retentionTime) throws MetaException {
try {
List<CompactionInfo> rc = new ArrayList<>();
-
+
try (Connection dbConn =
getDbConn(Connection.TRANSACTION_READ_COMMITTED);
- Statement stmt = dbConn.createStatement()) {
+ Statement stmt = dbConn.createStatement()) {
/*
* By filtering on minOpenTxnWaterMark, we will only cleanup after
every transaction is committed, that could see
* the uncompacted deltas. This way the cleaner can clean up
everything that was made obsolete by this compaction.
@@ -340,20 +340,20 @@ class CompactionTxnHandler extends TxnHandler {
whereClause += " AND \"CQ_COMMIT_TIME\" < (" + getEpochFn(dbProduct)
+ " - " + retentionTime + ")";
}
String s = "SELECT \"CQ_ID\", \"cq1\".\"CQ_DATABASE\",
\"cq1\".\"CQ_TABLE\", \"cq1\".\"CQ_PARTITION\"," +
- " \"CQ_TYPE\", \"CQ_RUN_AS\", \"CQ_HIGHEST_WRITE_ID\",
\"CQ_TBLPROPERTIES\"" +
- " FROM \"COMPACTION_QUEUE\" \"cq1\" " +
- "INNER JOIN (" +
- " SELECT MIN(\"CQ_HIGHEST_WRITE_ID\") \"WRITE_ID\",
\"CQ_DATABASE\", \"CQ_TABLE\", \"CQ_PARTITION\"" +
- " FROM \"COMPACTION_QUEUE\""
- + whereClause +
- " GROUP BY \"CQ_DATABASE\", \"CQ_TABLE\", \"CQ_PARTITION\") \"cq2\"
" +
- "ON \"cq1\".\"CQ_DATABASE\" = \"cq2\".\"CQ_DATABASE\""+
- " AND \"cq1\".\"CQ_TABLE\" = \"cq2\".\"CQ_TABLE\""+
- " AND (\"cq1\".\"CQ_PARTITION\" = \"cq2\".\"CQ_PARTITION\"" +
- " OR \"cq1\".\"CQ_PARTITION\" IS NULL AND
\"cq2\".\"CQ_PARTITION\" IS NULL)"
- + whereClause +
- " AND \"CQ_HIGHEST_WRITE_ID\" = \"WRITE_ID\"" +
- " ORDER BY \"CQ_ID\"";
+ " \"CQ_TYPE\", \"CQ_RUN_AS\", \"CQ_HIGHEST_WRITE_ID\",
\"CQ_TBLPROPERTIES\"" +
+ " FROM \"COMPACTION_QUEUE\" \"cq1\" " +
+ "INNER JOIN (" +
+ " SELECT MIN(\"CQ_HIGHEST_WRITE_ID\") \"WRITE_ID\",
\"CQ_DATABASE\", \"CQ_TABLE\", \"CQ_PARTITION\"" +
+ " FROM \"COMPACTION_QUEUE\""
+ + whereClause +
+ " GROUP BY \"CQ_DATABASE\", \"CQ_TABLE\", \"CQ_PARTITION\")
\"cq2\" " +
+ "ON \"cq1\".\"CQ_DATABASE\" = \"cq2\".\"CQ_DATABASE\""+
+ " AND \"cq1\".\"CQ_TABLE\" = \"cq2\".\"CQ_TABLE\""+
+ " AND (\"cq1\".\"CQ_PARTITION\" = \"cq2\".\"CQ_PARTITION\"" +
+ " OR \"cq1\".\"CQ_PARTITION\" IS NULL AND
\"cq2\".\"CQ_PARTITION\" IS NULL)"
+ + whereClause +
+ " AND \"CQ_HIGHEST_WRITE_ID\" = \"WRITE_ID\"" +
+ " ORDER BY \"CQ_ID\"";
LOG.debug("Going to execute query <" + s + ">");
try (ResultSet rs = stmt.executeQuery(s)) {
@@ -377,13 +377,110 @@ class CompactionTxnHandler extends TxnHandler {
LOG.error("Unable to select next element for cleaning, " +
e.getMessage());
checkRetryable(e, "findReadyToClean");
throw new MetaException("Unable to connect to transaction database " +
- StringUtils.stringifyException(e));
- }
+ StringUtils.stringifyException(e));
+ }
} catch (RetryException e) {
return findReadyToClean(minOpenTxnWaterMark, retentionTime);
}
}
+
+ /**
+ * Mark the cleaning start time for a particular compaction
+ *
+ * @param info info on the compaction entry
+ */
+ @Override
+ @RetrySemantics.ReadOnly
+ public void markCleanerStart(CompactionInfo info) throws MetaException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Running markCleanerStart with CompactionInfo: " +
info.toString());
+ }
+
+ try {
+ Connection dbConn = null;
+ try {
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ long now = getDbTime(dbConn);
+ setCleanerStart(dbConn, info, now);
+ } catch (SQLException e) {
+ LOG.error("Unable to set the cleaner start time for compaction record
" + e.getMessage());
+ LOG.debug("Going to rollback");
+ rollbackDBConn(dbConn);
+ checkRetryable(e, "markCleanerStart(" + info + ")");
+ throw new MetaException("Unable to connect to transaction database " +
StringUtils.stringifyException(e));
+ } finally {
+ closeDbConn(dbConn);
+ }
+ } catch (RetryException e) {
+ markCleanerStart(info);
+ }
+ }
+
+ /**
+ * Removes the cleaning start time for a particular compaction
+ *
+ * @param info info on the compaction entry
+ */
+ @Override
+ @RetrySemantics.ReadOnly
+ public void clearCleanerStart(CompactionInfo info) throws MetaException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Running clearCleanerStart with CompactionInfo: " +
info.toString());
+ }
+
+ try {
+ Connection dbConn = null;
+ try {
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ setCleanerStart(dbConn, info, -1L);
+ } catch (SQLException e) {
+ LOG.error("Unable to clear the cleaner start time for compaction
record " + e.getMessage());
+ LOG.debug("Going to rollback");
+ rollbackDBConn(dbConn);
+ checkRetryable(e, "clearCleanerStart(" + info + ")");
+ throw new MetaException("Unable to connect to transaction database " +
StringUtils.stringifyException(e));
+ } finally {
+ closeDbConn(dbConn);
+ }
+ } catch (RetryException e) {
+ clearCleanerStart(info);
+ }
+ }
+
+ private void setCleanerStart(Connection dbConn, CompactionInfo info, Long
timestamp)
+ throws RetryException, SQLException {
+ long id = info.id;
+ PreparedStatement pStmt = null;
+ ResultSet rs = null;
+ try {
+ String query = "" +
+ " UPDATE " +
+ " \"COMPACTION_QUEUE\" " +
+ " SET " +
+ " \"CQ_CLEANER_START\" = " + timestamp +
+ " WHERE " +
+ " \"CQ_ID\" = " + id +
+ " AND " +
+ " \"CQ_STATE\"='" + READY_FOR_CLEANING + "'";
+
+ pStmt = dbConn.prepareStatement(query);
+ LOG.debug("Going to execute update <" + query + "> for CQ_ID=" + id);
+ int updCount = pStmt.executeUpdate();
+ if (updCount != 1) {
+ LOG.error("Unable to update compaction record: " + info + ". Update
count=" + updCount);
+ LOG.debug("Going to rollback");
+ dbConn.rollback();
+ } else {
+ LOG.debug("Going to commit");
+ dbConn.commit();
+ }
+ } finally {
+ close(rs);
+ closeStmt(pStmt);
+ }
+ }
+
/**
* This will remove an entry from the queue after
* it has been compacted.
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 0d7d573..c9c09ad 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -3703,6 +3703,7 @@ abstract class TxnHandler implements TxnStore,
TxnStore.MutexAPI {
return Character.toString(s);
}
}
+
@RetrySemantics.ReadOnly
public ShowCompactResponse showCompact(ShowCompactRequest rqst) throws
MetaException {
ShowCompactResponse response = new ShowCompactResponse(new ArrayList<>());
@@ -3712,15 +3713,23 @@ abstract class TxnHandler implements TxnStore,
TxnStore.MutexAPI {
try {
dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
stmt = dbConn.createStatement();
- String s = "SELECT \"CQ_DATABASE\", \"CQ_TABLE\", \"CQ_PARTITION\",
\"CQ_STATE\", \"CQ_TYPE\", \"CQ_WORKER_ID\", " +
- //-1 because 'null' literal doesn't work for all DBs...
- "\"CQ_START\", -1 \"CC_END\", \"CQ_RUN_AS\", \"CQ_HADOOP_JOB_ID\",
\"CQ_ID\", \"CQ_ERROR_MESSAGE\", " +
- "\"CQ_ENQUEUE_TIME\", \"CQ_WORKER_VERSION\", \"CQ_INITIATOR_ID\",
\"CQ_INITIATOR_VERSION\" " +
- "FROM \"COMPACTION_QUEUE\" UNION ALL " +
- "SELECT \"CC_DATABASE\", \"CC_TABLE\", \"CC_PARTITION\",
\"CC_STATE\", \"CC_TYPE\", \"CC_WORKER_ID\", " +
- "\"CC_START\", \"CC_END\", \"CC_RUN_AS\", \"CC_HADOOP_JOB_ID\",
\"CC_ID\", \"CC_ERROR_MESSAGE\", " +
- "\"CC_ENQUEUE_TIME\", \"CC_WORKER_VERSION\", \"CC_INITIATOR_ID\",
\"CC_INITIATOR_VERSION\" " +
- " FROM \"COMPLETED_COMPACTIONS\""; //todo: sort by cq_id?
+ String s = "" +
+ //-1 because 'null' literal doesn't work for all DBs...
+ "SELECT " +
+ " \"CQ_DATABASE\", \"CQ_TABLE\", \"CQ_PARTITION\", \"CQ_STATE\",
\"CQ_TYPE\", \"CQ_WORKER_ID\", " +
+ " \"CQ_START\", -1 \"CC_END\", \"CQ_RUN_AS\",
\"CQ_HADOOP_JOB_ID\", \"CQ_ID\", \"CQ_ERROR_MESSAGE\", " +
+ " \"CQ_ENQUEUE_TIME\", \"CQ_WORKER_VERSION\",
\"CQ_INITIATOR_ID\", \"CQ_INITIATOR_VERSION\", " +
+ " \"CQ_CLEANER_START\"" +
+ "FROM " +
+ " \"COMPACTION_QUEUE\" " +
+ "UNION ALL " +
+ "SELECT " +
+ " \"CC_DATABASE\", \"CC_TABLE\", \"CC_PARTITION\", \"CC_STATE\",
\"CC_TYPE\", \"CC_WORKER_ID\", " +
+ " \"CC_START\", \"CC_END\", \"CC_RUN_AS\", \"CC_HADOOP_JOB_ID\",
\"CC_ID\", \"CC_ERROR_MESSAGE\", " +
+ " \"CC_ENQUEUE_TIME\", \"CC_WORKER_VERSION\",
\"CC_INITIATOR_ID\", \"CC_INITIATOR_VERSION\", " +
+ " -1 " +
+ "FROM " +
+ " \"COMPLETED_COMPACTIONS\""; //todo: sort by cq_id?
//what I want is order by cc_end desc, cc_start asc (but derby has a
bug https://issues.apache.org/jira/browse/DERBY-6013)
//to sort so that currently running jobs are at the end of the list
(bottom of screen)
//and currently running ones are in sorted by start time
@@ -3740,11 +3749,11 @@ abstract class TxnHandler implements TxnStore,
TxnStore.MutexAPI {
}
e.setWorkerid(rs.getString(6));
long start = rs.getLong(7);
- if(!rs.wasNull()) {
+ if (!rs.wasNull()) {
e.setStart(start);
}
long endTime = rs.getLong(8);
- if(endTime != -1) {
+ if (endTime != -1) {
e.setEndTime(endTime);
}
e.setRunAs(rs.getString(9));
@@ -3752,18 +3761,22 @@ abstract class TxnHandler implements TxnStore,
TxnStore.MutexAPI {
e.setId(rs.getLong(11));
e.setErrorMessage(rs.getString(12));
long enqueueTime = rs.getLong(13);
- if(!rs.wasNull()) {
+ if (!rs.wasNull()) {
e.setEnqueueTime(enqueueTime);
}
e.setWorkerVersion(rs.getString(14));
e.setInitiatorId(rs.getString(15));
e.setInitiatorVersion(rs.getString(16));
+ long cleanerStart = rs.getLong(17);
+ if (!rs.wasNull() && (cleanerStart != -1)) {
+ e.setCleanerStart(cleanerStart);
+ }
response.addToCompacts(e);
}
} catch (SQLException e) {
checkRetryable(e, "showCompact(" + rqst + ")");
throw new MetaException("Unable to select from transaction database " +
- StringUtils.stringifyException(e));
+ StringUtils.stringifyException(e));
} finally {
closeStmt(stmt);
closeDbConn(dbConn);
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
index a0af4a8..d325765 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
@@ -24,8 +24,52 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.common.classification.RetrySemantics;
-import org.apache.hadoop.hive.metastore.api.*;
-import org.apache.hadoop.hive.metastore.events.AcidWriteEvent;
+import org.apache.hadoop.hive.metastore.api.AbortTxnRequest;
+import org.apache.hadoop.hive.metastore.api.AbortTxnsRequest;
+import org.apache.hadoop.hive.metastore.api.AddDynamicPartitions;
+import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsRequest;
+import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsResponse;
+import org.apache.hadoop.hive.metastore.api.CheckLockRequest;
+import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
+import org.apache.hadoop.hive.metastore.api.CompactionRequest;
+import org.apache.hadoop.hive.metastore.api.CompactionResponse;
+import org.apache.hadoop.hive.metastore.api.CreationMetadata;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.FindNextCompactRequest;
+import
org.apache.hadoop.hive.metastore.api.GetLatestCommittedCompactionInfoRequest;
+import
org.apache.hadoop.hive.metastore.api.GetLatestCommittedCompactionInfoResponse;
+import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;
+import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse;
+import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsRequest;
+import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsResponse;
+import org.apache.hadoop.hive.metastore.api.HeartbeatRequest;
+import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeRequest;
+import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeResponse;
+import org.apache.hadoop.hive.metastore.api.HiveObjectType;
+import org.apache.hadoop.hive.metastore.api.LockRequest;
+import org.apache.hadoop.hive.metastore.api.LockResponse;
+import org.apache.hadoop.hive.metastore.api.Materialization;
+import org.apache.hadoop.hive.metastore.api.MaxAllocatedTableWriteIdRequest;
+import org.apache.hadoop.hive.metastore.api.MaxAllocatedTableWriteIdResponse;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchLockException;
+import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
+import org.apache.hadoop.hive.metastore.api.OpenTxnRequest;
+import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.ReplTblWriteIdStateRequest;
+import org.apache.hadoop.hive.metastore.api.SeedTableWriteIdsRequest;
+import org.apache.hadoop.hive.metastore.api.SeedTxnIdRequest;
+import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
+import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
+import org.apache.hadoop.hive.metastore.api.ShowLocksRequest;
+import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
+import org.apache.hadoop.hive.metastore.api.TxnOpenException;
+import org.apache.hadoop.hive.metastore.api.TxnType;
+import org.apache.hadoop.hive.metastore.api.UnlockRequest;
+import org.apache.hadoop.hive.metastore.api.UpdateTransactionalStatsRequest;
import org.apache.hadoop.hive.metastore.events.ListenerEvent;
import java.sql.SQLException;
@@ -382,7 +426,7 @@ public interface TxnStore extends Configurable {
* @param compactionTxnId - txnid in which Compactor is running
*/
@RetrySemantics.Idempotent
- public void updateCompactorState(CompactionInfo ci, long compactionTxnId)
throws MetaException;
+ void updateCompactorState(CompactionInfo ci, long compactionTxnId) throws
MetaException;
/**
* This will grab the next compaction request off of
@@ -423,6 +467,22 @@ public interface TxnStore extends Configurable {
List<CompactionInfo> findReadyToClean(long minOpenTxnWaterMark, long
retentionTime) throws MetaException;
/**
+ * Sets the cleaning start time for a particular compaction
+ *
+ * @param info info on the compaction entry
+ */
+ @RetrySemantics.CannotRetry
+ void markCleanerStart(CompactionInfo info) throws MetaException;
+
+ /**
+ * Removes the cleaning start time for a particular compaction
+ *
+ * @param info info on the compaction entry
+ */
+ @RetrySemantics.CannotRetry
+ void clearCleanerStart(CompactionInfo info) throws MetaException;
+
+ /**
* This will remove an entry from the queue after
* it has been compacted.
*
diff --git
a/standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql
b/standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql
index 0d4ad17..5c49580 100644
---
a/standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql
+++
b/standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql
@@ -628,7 +628,8 @@ CREATE TABLE COMPACTION_QUEUE (
CQ_COMMIT_TIME bigint,
CQ_INITIATOR_ID varchar(128),
CQ_INITIATOR_VERSION varchar(128),
- CQ_WORKER_VERSION varchar(128)
+ CQ_WORKER_VERSION varchar(128),
+ CQ_CLEANER_START bigint
);
CREATE TABLE NEXT_COMPACTION_QUEUE_ID (
diff --git
a/standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql
b/standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql
index 86c49c4..37e42d8 100644
---
a/standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql
+++
b/standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql
@@ -182,5 +182,8 @@ ALTER TABLE "APP"."MV_TABLES_USED" ADD COLUMN
"UPDATED_COUNT" BIGINT NOT NULL DE
ALTER TABLE "APP"."MV_TABLES_USED" ADD COLUMN "DELETED_COUNT" BIGINT NOT NULL
DEFAULT 0;
ALTER TABLE "APP"."MV_TABLES_USED" ADD CONSTRAINT "MV_TABLES_USED_PK" PRIMARY
KEY ("TBL_ID", "MV_CREATION_METADATA_ID");
+-- HIVE-25737
+ALTER TABLE COMPACTION_QUEUE ADD CQ_CLEANER_START bigint;
+
-- This needs to be the last thing done. Insert any changes above this line.
UPDATE "APP".VERSION SET SCHEMA_VERSION='4.0.0', VERSION_COMMENT='Hive release
version 4.0.0' where VER_ID=1;
diff --git
a/standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql
b/standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql
index 6a3b174..72a402a 100644
---
a/standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql
+++
b/standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql
@@ -1048,6 +1048,7 @@ CREATE TABLE COMPACTION_QUEUE(
CQ_INITIATOR_ID nvarchar(128) NULL,
CQ_INITIATOR_VERSION nvarchar(128) NULL,
CQ_WORKER_VERSION nvarchar(128) NULL,
+ CQ_CLEANER_START bigint NULL,
PRIMARY KEY CLUSTERED
(
CQ_ID ASC
diff --git
a/standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql
b/standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql
index 296cca5..5460d7a 100644
---
a/standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql
+++
b/standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql
@@ -233,6 +233,9 @@ ALTER TABLE "MV_TABLES_USED" ADD "UPDATED_COUNT" BIGINT NOT
NULL DEFAULT 0;
ALTER TABLE "MV_TABLES_USED" ADD "DELETED_COUNT" BIGINT NOT NULL DEFAULT 0;
ALTER TABLE "MV_TABLES_USED" ADD CONSTRAINT "MV_TABLES_USED_PK" PRIMARY KEY
("TBL_ID", "MV_CREATION_METADATA_ID");
+-- HIVE-25737
+ALTER TABLE COMPACTION_QUEUE ADD CQ_CLEANER_START bigint NULL;
+
-- These lines need to be last. Insert any changes above.
UPDATE VERSION SET SCHEMA_VERSION='4.0.0', VERSION_COMMENT='Hive release
version 4.0.0' where VER_ID=1;
SELECT 'Finished upgrading MetaStore schema from 3.2.0 to 4.0.0' AS MESSAGE;
diff --git
a/standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql
b/standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql
index 0cbc091..2ab4cf6 100644
---
a/standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql
+++
b/standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql
@@ -1089,7 +1089,8 @@ CREATE TABLE COMPACTION_QUEUE (
CQ_COMMIT_TIME bigint,
CQ_INITIATOR_ID varchar(128),
CQ_INITIATOR_VERSION varchar(128),
- CQ_WORKER_VERSION varchar(128)
+ CQ_WORKER_VERSION varchar(128),
+ CQ_CLEANER_START bigint
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
CREATE TABLE COMPLETED_COMPACTIONS (
diff --git
a/standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql
b/standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql
index 1d3a1f06..89b6a91 100644
---
a/standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql
+++
b/standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql
@@ -210,6 +210,9 @@ ALTER TABLE `MV_TABLES_USED` ADD COLUMN `UPDATED_COUNT`
bigint(20) NOT NULL DEFA
ALTER TABLE `MV_TABLES_USED` ADD COLUMN `DELETED_COUNT` bigint(20) NOT NULL
DEFAULT 0;
ALTER TABLE `MV_TABLES_USED` ADD CONSTRAINT `MV_TABLES_USED_PK` PRIMARY KEY
(`TBL_ID`, `MV_CREATION_METADATA_ID`);
+-- HIVE-25737
+ALTER TABLE COMPACTION_QUEUE ADD CQ_CLEANER_START bigint;
+
-- These lines need to be last. Insert any changes above.
UPDATE VERSION SET SCHEMA_VERSION='4.0.0', VERSION_COMMENT='Hive release
version 4.0.0' where VER_ID=1;
SELECT 'Finished upgrading MetaStore schema from 3.2.0 to 4.0.0' AS MESSAGE;
diff --git
a/standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql
b/standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql
index a208154..055f101 100644
---
a/standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql
+++
b/standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql
@@ -1091,7 +1091,8 @@ CREATE TABLE COMPACTION_QUEUE (
CQ_COMMIT_TIME NUMBER(19),
CQ_INITIATOR_ID varchar(128),
CQ_INITIATOR_VERSION varchar(128),
- CQ_WORKER_VERSION varchar(128)
+ CQ_WORKER_VERSION varchar(128),
+ CQ_CLEANER_START NUMBER(19)
) ROWDEPENDENCIES;
CREATE TABLE NEXT_COMPACTION_QUEUE_ID (
diff --git
a/standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql
b/standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql
index d5810de..cce71a3 100644
---
a/standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql
+++
b/standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql
@@ -207,7 +207,9 @@ ALTER TABLE MV_TABLES_USED ADD UPDATED_COUNT NUMBER DEFAULT
0 NOT NULL;
ALTER TABLE MV_TABLES_USED ADD DELETED_COUNT NUMBER DEFAULT 0 NOT NULl;
ALTER TABLE MV_TABLES_USED ADD CONSTRAINT MV_TABLES_USED_PK PRIMARY KEY
(TBL_ID, MV_CREATION_METADATA_ID);
+-- HIVE-25737
+ALTER TABLE COMPACTION_QUEUE ADD CQ_CLEANER_START NUMBER(19);
+
-- These lines need to be last. Insert any changes above.
UPDATE VERSION SET SCHEMA_VERSION='4.0.0', VERSION_COMMENT='Hive release
version 4.0.0' where VER_ID=1;
SELECT 'Finished upgrading MetaStore schema from 3.2.0 to 4.0.0' AS Status
from dual;
-
diff --git
a/standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql
b/standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql
index 6e83ff1..639f18d 100644
---
a/standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql
+++
b/standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql
@@ -1800,7 +1800,8 @@ CREATE TABLE "COMPACTION_QUEUE" (
"CQ_COMMIT_TIME" bigint,
"CQ_INITIATOR_ID" varchar(128),
"CQ_INITIATOR_VERSION" varchar(128),
- "CQ_WORKER_VERSION" varchar(128)
+ "CQ_WORKER_VERSION" varchar(128),
+ "CQ_CLEANER_START" bigint
);
CREATE TABLE "NEXT_COMPACTION_QUEUE_ID" (
diff --git
a/standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql
b/standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql
index 83f8336..a2c0a81 100644
---
a/standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql
+++
b/standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql
@@ -344,7 +344,9 @@ ALTER TABLE "MV_TABLES_USED" ADD "UPDATED_COUNT" bigint NOT
NULL DEFAULT 0;
ALTER TABLE "MV_TABLES_USED" ADD "DELETED_COUNT" bigint NOT NULL DEFAULT 0;
ALTER TABLE "MV_TABLES_USED" ADD CONSTRAINT "MV_TABLES_USED_PK" PRIMARY KEY
("TBL_ID", "MV_CREATION_METADATA_ID");
+-- HIVE-25737
+ALTER TABLE "COMPACTION_QUEUE" ADD "CQ_CLEANER_START" bigint;
+
-- These lines need to be last. Insert any changes above.
UPDATE "VERSION" SET "SCHEMA_VERSION"='4.0.0', "VERSION_COMMENT"='Hive release
version 4.0.0' where "VER_ID"=1;
SELECT 'Finished upgrading MetaStore schema from 3.2.0 to 4.0.0';
-
diff --git
a/standalone-metastore/metastore-server/src/test/resources/sql/postgres/upgrade-3.1.3000-to-4.0.0.postgres.sql
b/standalone-metastore/metastore-server/src/test/resources/sql/postgres/upgrade-3.1.3000-to-4.0.0.postgres.sql
index 21f8061..e6507bd 100644
---
a/standalone-metastore/metastore-server/src/test/resources/sql/postgres/upgrade-3.1.3000-to-4.0.0.postgres.sql
+++
b/standalone-metastore/metastore-server/src/test/resources/sql/postgres/upgrade-3.1.3000-to-4.0.0.postgres.sql
@@ -115,6 +115,9 @@ ALTER TABLE "DBS" ADD "DATACONNECTOR_NAME" character
varying(128);
ALTER TABLE "DBS" ADD "REMOTE_DBNAME" character varying(128);
UPDATE "DBS" SET "TYPE"= 'NATIVE' WHERE "TYPE" IS NULL;
+-- HIVE-25737
+ALTER TABLE "COMPACTION_QUEUE" ADD "CQ_CLEANER_START" bigint;
+
-- These lines need to be last. Insert any changes above.
UPDATE "VERSION" SET "SCHEMA_VERSION"='4.0.0', "VERSION_COMMENT"='Hive release
version 4.0.0' where "VER_ID"=1;
SELECT 'Finished upgrading MetaStore schema from 3.1.3000 to 4.0.0';