This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new bfdaeab HIVE-23624: Add metastore metrics to show the compaction
status (#1064) (Peter Vary reviewed by Laszlo Pinter)
bfdaeab is described below
commit bfdaeabb85ec4d91a816f50a44927e2e17cb1b5b
Author: pvary <[email protected]>
AuthorDate: Tue Jun 9 09:58:35 2020 +0200
HIVE-23624: Add metastore metrics to show the compaction status (#1064)
(Peter Vary reviewed by Laszlo Pinter)
HIVE-23624: Add metastore metrics to show the compaction status (#1064)
(Peter Vary reviewed by Laszlo Pinter)
---
.../hive/ql/txn/compactor/CompactorThread.java | 1 -
.../hadoop/hive/ql/txn/compactor/Initiator.java | 38 +++++-
.../hive/ql/txn/compactor/TestInitiator.java | 148 +++++++++++++++++++++
.../hive/metastore/metrics/MetricsConstants.java | 1 +
.../apache/hadoop/hive/metastore/txn/TxnStore.java | 3 +
5 files changed, 189 insertions(+), 2 deletions(-)
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
index fb23c2f..1b0af0e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
@@ -23,7 +23,6 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.RawStore;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Partition;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
index 7913295..5b2c937 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hive.ql.txn.compactor;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -39,6 +40,8 @@ import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.metrics.Metrics;
+import org.apache.hadoop.hive.metastore.metrics.MetricsConstants;
import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
@@ -92,6 +95,7 @@ public class Initiator extends MetaStoreCompactorThread {
long abortedTimeThreshold = HiveConf
.getTimeVar(conf,
HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_TIME_THRESHOLD,
TimeUnit.MILLISECONDS);
+ boolean metricsEnabled = MetastoreConf.getBoolVar(conf,
MetastoreConf.ConfVars.METRICS_ENABLED);
// Make sure we run through the loop once before checking to stop as
this makes testing
// much easier. The stop value is only for testing anyway and not used
when called from
@@ -109,9 +113,13 @@ public class Initiator extends MetaStoreCompactorThread {
long compactionInterval = (prevStart < 0) ? prevStart : (startedAt -
prevStart)/1000;
prevStart = startedAt;
- //todo: add method to only get current i.e. skip history - more
efficient
ShowCompactResponse currentCompactions = txnHandler.showCompact(new
ShowCompactRequest());
+ if (metricsEnabled) {
+ // Update compaction metrics based on showCompactions result
+ updateCompactionMetrics(currentCompactions);
+ }
+
Set<CompactionInfo> potentials =
txnHandler.findPotentialCompactions(abortedThreshold,
abortedTimeThreshold, compactionInterval)
.stream()
@@ -487,4 +495,32 @@ public class Initiator extends MetaStoreCompactorThread {
}
return true;
}
+
+ @VisibleForTesting
+ protected static void updateCompactionMetrics(ShowCompactResponse
showCompactResponse) {
+ Map<String, ShowCompactResponseElement> lastElements = new HashMap<>();
+
+ // Get the last compaction for each db/table/partition
+ for(ShowCompactResponseElement element :
showCompactResponse.getCompacts()) {
+ String key = element.getDbname() + "/" + element.getTablename() +
+ (element.getPartitionname() != null ? "/" +
element.getPartitionname() : "");
+ // If new key, add the element, if there is an existing one, change to
the element if the element.id is greater than old.id
+ lastElements.compute(key, (k, old) -> (old == null) ? element :
(element.getId() > old.getId() ? element : old));
+ }
+
+ // Get the current count for each state
+ Map<String, Long> counts = lastElements.values().stream()
+ .collect(Collectors.groupingBy(e -> e.getState(),
Collectors.counting()));
+
+ // Update metrics
+ for (int i = 0; i < TxnStore.COMPACTION_STATES.length; ++i) {
+ String key = MetricsConstants.COMPACTION_STATUS_PREFIX +
TxnStore.COMPACTION_STATES[i];
+ Long count = counts.get(TxnStore.COMPACTION_STATES[i]);
+ if (count != null) {
+ Metrics.getOrCreateGauge(key).set(count.intValue());
+ } else {
+ Metrics.getOrCreateGauge(key).set(0);
+ }
+ }
+ }
}
diff --git
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
index 279de19..f3ab5ce 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
@@ -34,6 +34,10 @@ import
org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.metrics.Metrics;
+import org.apache.hadoop.hive.metastore.metrics.MetricsConstants;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
@@ -50,6 +54,7 @@ import java.util.concurrent.TimeUnit;
* Tests for the compactor Initiator thread.
*/
public class TestInitiator extends CompactorTest {
+ private final String INITIATED_METRICS_KEY =
MetricsConstants.COMPACTION_STATUS_PREFIX + TxnStore.INITIATED_RESPONSE;
@Test
public void nothing() throws Exception {
@@ -817,6 +822,149 @@ public class TestInitiator extends CompactorTest {
Assert.assertEquals(10, compacts.size());
}
+ @Test
+ public void testInitiatorMetricsEnabled() throws Exception {
+ MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.METRICS_ENABLED,
true);
+ Metrics.initialize(conf);
+ int originalValue =
Metrics.getOrCreateGauge(INITIATED_METRICS_KEY).intValue();
+ Table t = newTable("default", "ime", true);
+ List<LockComponent> components = new ArrayList<>();
+
+ for (int i = 0; i < 10; i++) {
+ Partition p = newPartition(t, "part" + (i + 1));
+ addBaseFile(t, p, 20L, 20);
+ addDeltaFile(t, p, 21L, 22L, 2);
+ addDeltaFile(t, p, 23L, 24L, 2);
+
+ LockComponent comp = new LockComponent(LockType.SHARED_WRITE,
LockLevel.PARTITION, "default");
+ comp.setTablename("ime");
+ comp.setPartitionname("ds=part" + (i + 1));
+ comp.setOperationType(DataOperationType.UPDATE);
+ components.add(comp);
+ }
+ burnThroughTransactions("default", "ime", 23);
+ long txnid = openTxn();
+
+ LockRequest req = new LockRequest(components, "me", "localhost");
+ req.setTxnid(txnid);
+ LockResponse res = txnHandler.lock(req);
+ Assert.assertEquals(LockState.ACQUIRED, res.getState());
+
+ long writeid = allocateWriteId("default", "ime", txnid);
+ Assert.assertEquals(24, writeid);
+ txnHandler.commitTxn(new CommitTxnRequest(txnid));
+
+ startInitiator();
+
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+ Assert.assertEquals(10, compacts.size());
+
+ // The metrics will appear after the next Initiator run
+ startInitiator();
+
+ Assert.assertEquals(originalValue + 10,
+ Metrics.getOrCreateGauge(MetricsConstants.COMPACTION_STATUS_PREFIX +
TxnStore.INITIATED_RESPONSE).intValue());
+ }
+
+ @Test
+ public void testInitiatorMetricsDisabled() throws Exception {
+ MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.METRICS_ENABLED,
false);
+ Metrics.initialize(conf);
+ int originalValue =
Metrics.getOrCreateGauge(INITIATED_METRICS_KEY).intValue();
+ Table t = newTable("default", "imd", true);
+ List<LockComponent> components = new ArrayList<>();
+
+ for (int i = 0; i < 10; i++) {
+ Partition p = newPartition(t, "part" + (i + 1));
+ addBaseFile(t, p, 20L, 20);
+ addDeltaFile(t, p, 21L, 22L, 2);
+ addDeltaFile(t, p, 23L, 24L, 2);
+
+ LockComponent comp = new LockComponent(LockType.SHARED_WRITE,
LockLevel.PARTITION, "default");
+ comp.setTablename("imd");
+ comp.setPartitionname("ds=part" + (i + 1));
+ comp.setOperationType(DataOperationType.UPDATE);
+ components.add(comp);
+ }
+ burnThroughTransactions("default", "imd", 23);
+ long txnid = openTxn();
+
+ LockRequest req = new LockRequest(components, "me", "localhost");
+ req.setTxnid(txnid);
+ LockResponse res = txnHandler.lock(req);
+ Assert.assertEquals(LockState.ACQUIRED, res.getState());
+
+ long writeid = allocateWriteId("default", "imd", txnid);
+ Assert.assertEquals(24, writeid);
+ txnHandler.commitTxn(new CommitTxnRequest(txnid));
+
+ startInitiator();
+
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+ Assert.assertEquals(10, compacts.size());
+
+ // The metrics will appear after the next Initiator run
+ startInitiator();
+
+ Assert.assertEquals(originalValue,
+ Metrics.getOrCreateGauge(INITIATED_METRICS_KEY).intValue());
+ }
+
+ @Test
+ public void testUpdateCompactionMetrics() {
+ Metrics.initialize(conf);
+ ShowCompactResponse scr = new ShowCompactResponse();
+ List<ShowCompactResponseElement> elements = new ArrayList<>();
+ elements.add(generateElement(1,"db", "tb", null, CompactionType.MAJOR,
TxnStore.FAILED_RESPONSE));
+ // Check for overwrite
+ elements.add(generateElement(2,"db", "tb", null, CompactionType.MAJOR,
TxnStore.INITIATED_RESPONSE));
+ elements.add(generateElement(3,"db", "tb2", null, CompactionType.MINOR,
TxnStore.INITIATED_RESPONSE));
+ elements.add(generateElement(5,"db", "tb3", "p1", CompactionType.MINOR,
TxnStore.ATTEMPTED_RESPONSE));
+ // Check for overwrite where the order is different
+ elements.add(generateElement(4,"db", "tb3", "p1", CompactionType.MINOR,
TxnStore.FAILED_RESPONSE));
+
+ elements.add(generateElement(6,"db1", "tb", null, CompactionType.MINOR,
TxnStore.FAILED_RESPONSE));
+ elements.add(generateElement(7,"db1", "tb2", null, CompactionType.MINOR,
TxnStore.FAILED_RESPONSE));
+ elements.add(generateElement(8,"db1", "tb3", null, CompactionType.MINOR,
TxnStore.FAILED_RESPONSE));
+
+ elements.add(generateElement(9,"db2", "tb", null, CompactionType.MINOR,
TxnStore.SUCCEEDED_RESPONSE));
+ elements.add(generateElement(10,"db2", "tb2", null, CompactionType.MINOR,
TxnStore.SUCCEEDED_RESPONSE));
+ elements.add(generateElement(11,"db2", "tb3", null, CompactionType.MINOR,
TxnStore.SUCCEEDED_RESPONSE));
+ elements.add(generateElement(12,"db2", "tb4", null, CompactionType.MINOR,
TxnStore.SUCCEEDED_RESPONSE));
+
+ elements.add(generateElement(13,"db3", "tb3", null, CompactionType.MINOR,
TxnStore.WORKING_RESPONSE));
+ elements.add(generateElement(14,"db3", "tb4", null, CompactionType.MINOR,
TxnStore.WORKING_RESPONSE));
+ elements.add(generateElement(15,"db3", "tb5", null, CompactionType.MINOR,
TxnStore.WORKING_RESPONSE));
+ elements.add(generateElement(16,"db3", "tb6", null, CompactionType.MINOR,
TxnStore.WORKING_RESPONSE));
+ elements.add(generateElement(17,"db3", "tb7", null, CompactionType.MINOR,
TxnStore.WORKING_RESPONSE));
+
+ scr.setCompacts(elements);
+ Initiator.updateCompactionMetrics(scr);
+
+ Assert.assertEquals(1,
+ Metrics.getOrCreateGauge(MetricsConstants.COMPACTION_STATUS_PREFIX +
TxnStore.ATTEMPTED_RESPONSE).intValue());
+ Assert.assertEquals(2,
+ Metrics.getOrCreateGauge(MetricsConstants.COMPACTION_STATUS_PREFIX +
TxnStore.INITIATED_RESPONSE).intValue());
+ Assert.assertEquals(3,
+ Metrics.getOrCreateGauge(MetricsConstants.COMPACTION_STATUS_PREFIX +
TxnStore.FAILED_RESPONSE).intValue());
+ Assert.assertEquals(4,
+ Metrics.getOrCreateGauge(MetricsConstants.COMPACTION_STATUS_PREFIX +
TxnStore.SUCCEEDED_RESPONSE).intValue());
+ Assert.assertEquals(5,
+ Metrics.getOrCreateGauge(MetricsConstants.COMPACTION_STATUS_PREFIX +
TxnStore.WORKING_RESPONSE).intValue());
+ Assert.assertEquals(0,
+ Metrics.getOrCreateGauge(MetricsConstants.COMPACTION_STATUS_PREFIX +
TxnStore.CLEANING_RESPONSE).intValue());
+ }
+
+ private ShowCompactResponseElement generateElement(long id, String db,
String table, String partition,
+ CompactionType type, String state) {
+ ShowCompactResponseElement element = new ShowCompactResponseElement(db,
table, type, state);
+ element.setId(id);
+ element.setPartitionname(partition);
+ return element;
+ }
+
@Override
boolean useHive130DeltaDirName() {
return false;
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/metrics/MetricsConstants.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/metrics/MetricsConstants.java
index 24c8c4c..7ae98fe 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/metrics/MetricsConstants.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/metrics/MetricsConstants.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.metastore.metrics;
public class MetricsConstants {
public static final String ACTIVE_CALLS = "active_calls_";
public static final String API_PREFIX = "api_";
+ public static final String COMPACTION_STATUS_PREFIX = "compaction_num_";
public static final String TOTAL_API_CALLS = "total_api_calls";
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
index 3e441b5..0d5b669 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
@@ -56,6 +56,9 @@ public interface TxnStore extends Configurable {
String SUCCEEDED_RESPONSE = "succeeded";
String ATTEMPTED_RESPONSE = "attempted";
+ String[] COMPACTION_STATES = new String[] {INITIATED_RESPONSE,
WORKING_RESPONSE, CLEANING_RESPONSE, FAILED_RESPONSE,
+ SUCCEEDED_RESPONSE, ATTEMPTED_RESPONSE};
+
int TIMED_OUT_TXN_ABORT_BATCH_SIZE = 50000;
/**