This is an automated email from the ASF dual-hosted git repository.
klcopp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 1d422c7 HIVE-25914: Fix of Cleaner updates Initiator cycle metric
(Viktor Csomor, reviewed by Karen Coppage)
1d422c7 is described below
commit 1d422c7677f37e77b667565471db0df8e6f69c9c
Author: Viktor Csomor <[email protected]>
AuthorDate: Mon Jan 31 14:45:59 2022 +0100
HIVE-25914: Fix of Cleaner updates Initiator cycle metric (Viktor Csomor,
reviewed by Karen Coppage)
Cleaner code fixed to update the correct metric.
Some test have been added.
Closes #2988.
---
.../hadoop/hive/ql/txn/compactor/Cleaner.java | 2 +-
.../ql/txn/compactor/TestCompactionMetrics.java | 71 ++++++++++++++++++++++
2 files changed, 72 insertions(+), 1 deletion(-)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
index df132e4..6dbf08b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
@@ -171,7 +171,7 @@ public class Cleaner extends MetaStoreCompactorThread {
handle.releaseLocks();
}
if (metricsEnabled) {
-
updateCycleDurationMetric(MetricsConstants.COMPACTION_INITIATOR_CYCLE_DURATION,
startedAt);
+
updateCycleDurationMetric(MetricsConstants.COMPACTION_CLEANER_CYCLE_DURATION,
startedAt);
}
stopCycleUpdater();
}
diff --git
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionMetrics.java
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionMetrics.java
index 90eff22..7a33176 100644
---
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionMetrics.java
+++
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionMetrics.java
@@ -850,6 +850,77 @@ public class TestCompactionMetrics extends CompactorTest {
2,
Metrics.getOrCreateGauge(MetricsConstants.WRITES_TO_DISABLED_COMPACTION_TABLE).intValue());
}
+ @Test
+ public void testInitiatorDurationMeasuredCorrectly() throws Exception {
+ final String DEFAULT_DB = "default";
+ final String TABLE_NAME = "x_table";
+ final String PARTITION_NAME = "part";
+
+ List<LockComponent> components = new ArrayList<>();
+
+ Table table = newTable(DEFAULT_DB, TABLE_NAME, true);
+
+ for (int i = 0; i < 10; i++) {
+ String partitionName = PARTITION_NAME + i;
+ Partition p = newPartition(table, partitionName);
+
+ addBaseFile(table, p, 20L, 20);
+ addDeltaFile(table, p, 21L, 22L, 2);
+ addDeltaFile(table, p, 23L, 24L, 2);
+ addDeltaFile(table, p, 21L, 24L, 4);
+
+ LockComponent comp = new LockComponent(LockType.SHARED_WRITE,
LockLevel.PARTITION, DEFAULT_DB);
+ comp.setTablename(TABLE_NAME);
+ comp.setPartitionname("ds=" + partitionName);
+ comp.setOperationType(DataOperationType.UPDATE);
+ components.add(comp);
+ }
+ burnThroughTransactions(DEFAULT_DB, TABLE_NAME, 25);
+
+ long txnId = openTxn();
+
+ LockRequest req = new LockRequest(components, "me", "localhost");
+ req.setTxnid(txnId);
+ LockResponse res = txnHandler.lock(req);
+ Assert.assertEquals(LockState.ACQUIRED, res.getState());
+
+ allocateWriteId(DEFAULT_DB, TABLE_NAME, txnId);
+ txnHandler.commitTxn(new CommitTxnRequest(txnId));
+
+ long initiatorStart = System.currentTimeMillis();
+ startInitiator();
+ long durationUpperLimit = System.currentTimeMillis() - initiatorStart;
+ int initiatorDurationFromMetric =
Metrics.getOrCreateGauge(MetricsConstants.COMPACTION_INITIATOR_CYCLE_DURATION)
+ .intValue();
+ Assert.assertTrue("Initiator duration must be withing the limits",
+ (0 < initiatorDurationFromMetric) && (initiatorDurationFromMetric <=
durationUpperLimit));
+ }
+
+ @Test
+ public void testCleanerDurationMeasuredCorrectly() throws Exception {
+ conf.setIntVar(HiveConf.ConfVars.COMPACTOR_MAX_NUM_DELTA, 1);
+
+ final String DB_NAME = "default";
+ final String TABLE_NAME = "x_table";
+ final String PARTITION_NAME = "part";
+
+ Table table = newTable(DB_NAME, TABLE_NAME, true);
+ Partition partition = newPartition(table, PARTITION_NAME);
+ addBaseFile(table, partition, 20L, 20);
+ addDeltaFile(table, partition, 21L, 22L, 2);
+ addDeltaFile(table, partition, 23L, 24L, 2);
+ burnThroughTransactions(DB_NAME, TABLE_NAME, 25);
+ doCompaction(DB_NAME, TABLE_NAME, PARTITION_NAME, CompactionType.MINOR);
+
+ long cleanerStart = System.currentTimeMillis();
+ startCleaner();
+ long durationUpperLimit = System.currentTimeMillis() - cleanerStart;
+ int cleanerDurationFromMetric =
Metrics.getOrCreateGauge(MetricsConstants.COMPACTION_CLEANER_CYCLE_DURATION)
+ .intValue();
+ Assert.assertTrue("Cleaner duration must be withing the limits",
+ (0 < cleanerDurationFromMetric) && (cleanerDurationFromMetric <=
durationUpperLimit));
+ }
+
private ShowCompactResponseElement generateElement(long id, String db,
String table, String partition,
CompactionType type, String state) {
return generateElement(id, db, table, partition, type, state,
System.currentTimeMillis());