lcspinter commented on a change in pull request #2916:
URL: https://github.com/apache/hive/pull/2916#discussion_r786756287
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
##########
@@ -142,6 +144,9 @@ public void init(AtomicBoolean stop) throws Exception {
super.init(stop);
this.workerName = getWorkerId();
setName(workerName);
+ metricsEnabled = MetastoreConf.getBoolVar(conf,
MetastoreConf.ConfVars.METRICS_ENABLED) &&
Review comment:
Done
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
##########
@@ -87,14 +106,15 @@ public void init(AtomicBoolean stop) throws Exception {
cleanerExecutor = CompactorUtil.createExecutorWithThreadFactory(
conf.getIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_THREADS_NUM),
COMPACTOR_CLEANER_THREAD_NAME_FORMAT);
+ metricsEnabled = MetastoreConf.getBoolVar(conf,
MetastoreConf.ConfVars.METRICS_ENABLED) &&
+ MetastoreConf.getBoolVar(conf,
MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON) &&
+ MetastoreConf.getBoolVar(conf,
MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON);
Review comment:
It doesn't hurt if we double check :)
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
##########
@@ -142,6 +144,9 @@ public void init(AtomicBoolean stop) throws Exception {
super.init(stop);
this.workerName = getWorkerId();
setName(workerName);
+ metricsEnabled = MetastoreConf.getBoolVar(conf,
MetastoreConf.ConfVars.METRICS_ENABLED) &&
+ MetastoreConf.getBoolVar(conf,
MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON) &&
+ MetastoreConf.getBoolVar(conf,
MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON);
Review comment:
Do we want to update metrics, when the initiator/Cleaner is not running?
Can that be a valid use case?
##########
File path:
ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionMetrics.java
##########
@@ -81,6 +81,7 @@
public void setUp() throws Exception {
MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.METRICS_ENABLED,
true);
MetastoreConf.setBoolVar(conf,
MetastoreConf.ConfVars.TXN_USE_MIN_HISTORY_LEVEL, true);
+ MetastoreConf.setBoolVar(conf,
MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON, true);
Review comment:
We need this flag set `true`, otherwise the metrics are not collected.
##########
File path:
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java
##########
@@ -8831,6 +8833,34 @@ public void mark_failed(CompactionInfoStruct cr) throws
MetaException {
getTxnHandler().markFailed(CompactionInfo.compactionStructToInfo(cr));
}
+ @Override
+ public CompactionMetricsDataResponse get_compaction_metrics_data(String
dbName, String tblName, String partitionName, CompactionMetricsMetricType type)
throws MetaException {
+ CompactionMetricsData metricsData =
+ getTxnHandler().getCompactionMetricsData(dbName, tblName,
partitionName,
+
CompactionMetricsDataConverter.thriftCompactionMetricType2DbType(type));
+ CompactionMetricsDataResponse response = new
CompactionMetricsDataResponse();
+ if (metricsData != null) {
+
response.setData(CompactionMetricsDataConverter.dataToStruct(metricsData));
+ }
+ return response;
+ }
+
+ @Override
+ public boolean update_compaction_metrics_data(CompactionMetricsDataStruct
struct, int version) throws MetaException {
+ return
getTxnHandler().updateCompactionMetricsData(CompactionMetricsDataConverter.structToData(struct),
version);
Review comment:
Per java doc, the object must be always non-null.
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
##########
@@ -671,6 +679,13 @@ private String getWorkerId() {
return name.toString();
}
+ private void updateDeltaFilesMetrics(AcidDirectory directory, String dbName,
String tableName, String partName,
+ CompactionType type) {
+ if (metricsEnabled) {
+ DeltaFilesMetricReporter.updateMetricsFromWorker(directory, dbName,
tableName, partName, type, conf, msc);
Review comment:
All the `updateMetricsFrom*` methods are static. They are completely
stateless, and the outcome of the metrics computation is stored in the backend
DB, which is accessible by all the compaction threads regardless of which
process is hosting them.
##########
File path: service/src/java/org/apache/hive/service/server/HiveServer2.java
##########
@@ -214,9 +214,6 @@ public synchronized void init(HiveConf hiveConf) {
try {
if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_METRICS_ENABLED)) {
MetricsFactory.init(hiveConf);
- if (MetastoreConf.getBoolVar(hiveConf,
MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON)) {
- DeltaFilesMetricReporter.init(hiveConf);
Review comment:
Good catch!
##########
File path:
standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql
##########
@@ -661,6 +661,16 @@ CREATE TABLE COMPLETED_COMPACTIONS (
CREATE INDEX COMPLETED_COMPACTIONS_RES ON COMPLETED_COMPACTIONS
(CC_DATABASE,CC_TABLE,CC_PARTITION);
+-- HIVE-25842
+CREATE TABLE COMPACTION_METRICS_CACHE (
+ CMC_DATABASE varchar(128) NOT NULL,
+ CMC_TABLE varchar(128) NOT NULL,
+ CMC_PARTITION varchar(767),
+ CMC_METRIC_TYPE varchar(128) NOT NULL,
+ CMC_METRIC_VALUE integer NOT NULL,
Review comment:
I think if for some reason the value is null, that row shouldn't be in
the table.
##########
File path:
standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
##########
@@ -2926,11 +2945,15 @@ PartitionsResponse
get_partitions_req(1:PartitionsRequest req)
void mark_cleaned(1:CompactionInfoStruct cr) throws(1:MetaException o1)
void mark_compacted(1: CompactionInfoStruct cr) throws(1:MetaException o1)
void mark_failed(1: CompactionInfoStruct cr) throws(1:MetaException o1)
+ CompactionMetricsDataResponse get_compaction_metrics_data(1: string dbName,
2: string tblName, 3: string partitionName, 4: CompactionMetricsMetricType
type) throws(1:MetaException o1)
Review comment:
Yes, it makes sense to change the param to a request object.
##########
File path:
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
##########
@@ -4112,6 +4126,189 @@ public MetricsInfo getMetricsInfo() throws
MetaException {
}
}
+ @Override
+ public CompactionMetricsData getCompactionMetricsData(String dbName, String
tblName, String partitionName,
+ CompactionMetricsData.MetricType type) throws MetaException {
+ Connection dbConn = null;
+ try {
+ try {
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ String query = SELECT_COMPACTION_METRICS_CACHE_QUERY;
+ if (partitionName != null) {
+ query += " AND \"CMC_PARTITION\" = ?";
+ } else {
+ query += " AND \"CMC_PARTITION\" IS NULL";
+ }
+ try (PreparedStatement pstmt = dbConn.prepareStatement(query)) {
+ pstmt.setString(1, dbName);
+ pstmt.setString(2, tblName);
+ pstmt.setString(3, type.toString());
+ if (partitionName != null) {
+ pstmt.setString(4, partitionName);
+ }
+ ResultSet resultSet = pstmt.executeQuery();
+ CompactionMetricsData.Builder builder = new
CompactionMetricsData.Builder();
+ if (resultSet.next()) {
+ return
builder.dbName(dbName).tblName(tblName).partitionName(partitionName).metricType(type)
+
.metricValue(resultSet.getInt(1)).version(resultSet.getInt(2)).build();
+ } else {
+ return null;
+ }
+ }
+
+ } catch (SQLException e) {
+ LOG.error("Unable to getDeltaMetricsInfo");
+ checkRetryable(e, "getDeltaMetricsInfo");
+ throw new MetaException("Unable to execute getDeltaMetricsInfo()" +
StringUtils.stringifyException(e));
Review comment:
The whole logic on the `DeltaFilesMetricReporter` is wrapped in a huge
try-catch block, that catches every `Throwable`, so this shouldn't tackle the
compaction threads.
##########
File path:
common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsFactory.java
##########
@@ -34,10 +34,10 @@
/**
* Initializes static Metrics instance.
*/
- public synchronized static void init(HiveConf conf) throws Exception {
+ public synchronized static void init(Configuration conf) throws Exception {
Review comment:
Two reasons:
1. It's always better to code to the interface
2. The initiator, where this method is called, has an instance of
HiveMestoreConf.
##########
File path:
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
##########
@@ -308,7 +307,22 @@
"SELECT \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\" FROM
\"TXN_COMPONENTS\" " +
"INNER JOIN \"TXNS\" ON \"TC_TXNID\" = \"TXN_ID\" WHERE
\"TXN_STATE\" = " + TxnStatus.ABORTED +
" GROUP BY \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\" HAVING
COUNT(\"TXN_ID\") > ?";
-
+ private static final String SELECT_COMPACTION_METRICS_CACHE_QUERY =
+ "SELECT \"CMC_METRIC_VALUE\", \"CMC_VERSION\" FROM
\"COMPACTION_METRICS_CACHE\" " +
+ "WHERE \"CMC_DATABASE\" = ? AND \"CMC_TABLE\" = ? AND
\"CMC_METRIC_TYPE\" = ?";
+ private static final String
NO_SELECT_COMPACTION_METRICS_CACHE_FOR_TYPE_QUERY =
+ "* FROM \"COMPACTION_METRICS_CACHE\" WHERE \"CMC_METRIC_TYPE\" = ? ORDER
BY \"CMC_METRIC_VALUE\" DESC";
Review comment:
Correct. Changed it to select column names.
##########
File path:
standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
##########
@@ -2926,11 +2945,15 @@ PartitionsResponse
get_partitions_req(1:PartitionsRequest req)
void mark_cleaned(1:CompactionInfoStruct cr) throws(1:MetaException o1)
void mark_compacted(1: CompactionInfoStruct cr) throws(1:MetaException o1)
void mark_failed(1: CompactionInfoStruct cr) throws(1:MetaException o1)
+ CompactionMetricsDataResponse get_compaction_metrics_data(1: string dbName,
2: string tblName, 3: string partitionName, 4: CompactionMetricsMetricType
type) throws(1:MetaException o1)
+ bool update_compaction_metrics_data(1: CompactionMetricsDataStruct data, 2:
i32 version) throws(1:MetaException o1)
+ void add_compaction_metrics_data(1: CompactionMetricsDataStruct data)
throws(1:MetaException o1)
+ void remove_compaction_metrics_data(1: string dbName, 2: string tblName, 3:
string partitionName, 4: CompactionMetricsMetricType type)
throws(1:MetaException o1)
Review comment:
Done
##########
File path:
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
##########
@@ -4112,6 +4126,189 @@ public MetricsInfo getMetricsInfo() throws
MetaException {
}
}
+ @Override
+ public CompactionMetricsData getCompactionMetricsData(String dbName, String
tblName, String partitionName,
+ CompactionMetricsData.MetricType type) throws MetaException {
+ Connection dbConn = null;
+ try {
+ try {
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ String query = SELECT_COMPACTION_METRICS_CACHE_QUERY;
+ if (partitionName != null) {
+ query += " AND \"CMC_PARTITION\" = ?";
+ } else {
+ query += " AND \"CMC_PARTITION\" IS NULL";
+ }
+ try (PreparedStatement pstmt = dbConn.prepareStatement(query)) {
+ pstmt.setString(1, dbName);
+ pstmt.setString(2, tblName);
+ pstmt.setString(3, type.toString());
+ if (partitionName != null) {
+ pstmt.setString(4, partitionName);
+ }
+ ResultSet resultSet = pstmt.executeQuery();
+ CompactionMetricsData.Builder builder = new
CompactionMetricsData.Builder();
+ if (resultSet.next()) {
+ return
builder.dbName(dbName).tblName(tblName).partitionName(partitionName).metricType(type)
+
.metricValue(resultSet.getInt(1)).version(resultSet.getInt(2)).build();
+ } else {
+ return null;
+ }
+ }
+
+ } catch (SQLException e) {
+ LOG.error("Unable to getDeltaMetricsInfo");
+ checkRetryable(e, "getDeltaMetricsInfo");
+ throw new MetaException("Unable to execute getDeltaMetricsInfo()" +
StringUtils.stringifyException(e));
+ } finally {
+ closeDbConn(dbConn);
+ }
+ } catch (RetryException e) {
+ return getCompactionMetricsData(dbName, tblName, partitionName, type);
+ }
+ }
+
+ @Override
+ public List<CompactionMetricsData> getTopCompactionMetricsDataPerType(int
limit)
+ throws MetaException {
+ Connection dbConn = null;
+ List<CompactionMetricsData> metricsDataList = new ArrayList<>();
+ try {
+ try {
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ for (CompactionMetricsData.MetricType type :
CompactionMetricsData.MetricType.values()) {
+ String query = sqlGenerator.addLimitClause(limit,
NO_SELECT_COMPACTION_METRICS_CACHE_FOR_TYPE_QUERY);
+ try (PreparedStatement pstmt = dbConn.prepareStatement(query)) {
+ pstmt.setString(1, type.toString());
+ ResultSet resultSet = pstmt.executeQuery();
+ while (resultSet.next()) {
+ CompactionMetricsData.Builder builder = new
CompactionMetricsData.Builder();
+ metricsDataList.add(builder
+ .dbName(resultSet.getString(1))
+ .tblName(resultSet.getString(2))
+ .partitionName(resultSet.getString(3))
+ .metricType(type)
+ .metricValue(resultSet.getInt(5))
+ .version(resultSet.getInt(6))
+ .build());
+ }
+ }
+ }
+ } catch (SQLException e) {
+ LOG.error("Unable to getCompactionMetricsDataForType");
+ checkRetryable(e, "getCompactionMetricsDataForType");
+ throw new MetaException("Unable to execute
getCompactionMetricsDataForType()" + stringifyException(e));
+ } finally {
+ closeDbConn(dbConn);
+ }
+ } catch (RetryException e) {
+ return getTopCompactionMetricsDataPerType(limit);
+ }
+ return metricsDataList;
+ }
+
+ @Override
+ public boolean updateCompactionMetricsData(CompactionMetricsData data, int
version) throws MetaException {
+ Connection dbConn = null;
+ try {
+ try {
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ String query = UPDATE_COMPACTION_METRICS_CACHE_QUERY;
+ if (data.getPartitionName() != null) {
+ query += " AND \"CMC_PARTITION\" = ?";
+ } else {
+ query += " AND \"CMC_PARTITION\" IS NULL";
+ }
+ try (PreparedStatement pstmt = dbConn.prepareStatement(query)) {
+ pstmt.setInt(1, data.getMetricValue());
+ pstmt.setInt(2, data.getVersion());
+ pstmt.setString(3, data.getDbName());
+ pstmt.setString(4, data.getTblName());
+ pstmt.setString(5, data.getMetricType().toString());
+ pstmt.setInt(6, version);
+ if (data.getPartitionName() != null) {
+ pstmt.setString(7, data.getPartitionName());
+ }
+ boolean updateRes = pstmt.executeUpdate() > 0;
+ dbConn.commit();
+ return updateRes;
+ }
+ } catch (SQLException e) {
+ rollbackDBConn(dbConn);
+ checkRetryable(e, "updateCompactionMetricsData(" + data + ", " +
version + ")");
+ throw new MetaException("Unable to execute
updateCompactionMetricsData()" + stringifyException(e));
+ } finally {
+ closeDbConn(dbConn);
+ }
+ } catch (RetryException e) {
+ updateCompactionMetricsData(data, version);
+ }
+ return true;
+ }
+
+ @Override
+ public void addCompactionMetricsData(CompactionMetricsData data) throws
MetaException {
+ Connection dbConn = null;
+ try {
+ try {
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ try (PreparedStatement pstmt =
dbConn.prepareStatement(INSERT_COMPACTION_METRICS_CACHE_QUERY)) {
+ pstmt.setString(1, data.getDbName());
+ pstmt.setString(2, data.getTblName());
+ pstmt.setString(3, data.getPartitionName());
Review comment:
Per definition, the `CMC_PARTITION` column can accept null values. The
`PreparedStatement` can convert java null values to db specific null values.
##########
File path:
standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
##########
@@ -2926,11 +2945,15 @@ PartitionsResponse
get_partitions_req(1:PartitionsRequest req)
void mark_cleaned(1:CompactionInfoStruct cr) throws(1:MetaException o1)
void mark_compacted(1: CompactionInfoStruct cr) throws(1:MetaException o1)
void mark_failed(1: CompactionInfoStruct cr) throws(1:MetaException o1)
+ CompactionMetricsDataResponse get_compaction_metrics_data(1: string dbName,
2: string tblName, 3: string partitionName, 4: CompactionMetricsMetricType
type) throws(1:MetaException o1)
+ bool update_compaction_metrics_data(1: CompactionMetricsDataStruct data, 2:
i32 version) throws(1:MetaException o1)
Review comment:
I moved the update and add together.
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
##########
@@ -396,7 +423,9 @@ private boolean removeFiles(String location,
ValidWriteIdList writeIdList, Compa
}
StringBuilder extraDebugInfo = new
StringBuilder("[").append(obsoleteDirs.stream()
.map(Path::getName).collect(Collectors.joining(",")));
- return remove(location, ci, obsoleteDirs, true, fs, extraDebugInfo);
+ boolean success = remove(location, ci, obsoleteDirs, true, fs,
extraDebugInfo);
+ updateDeltaFilesMetrics(ci.dbname, ci.tableName, ci.partName,
dir.getObsolete().size());
+ return success;
Review comment:
`dir.getObsolote()` doesn't include the aborted files. We have a
different function for that `dir.getAbortedDirectories()`
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
##########
@@ -87,14 +106,15 @@ public void init(AtomicBoolean stop) throws Exception {
cleanerExecutor = CompactorUtil.createExecutorWithThreadFactory(
conf.getIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_THREADS_NUM),
COMPACTOR_CLEANER_THREAD_NAME_FORMAT);
+ metricsEnabled = MetastoreConf.getBoolVar(conf,
MetastoreConf.ConfVars.METRICS_ENABLED) &&
Review comment:
Yes, this was intentional
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
##########
@@ -396,7 +423,9 @@ private boolean removeFiles(String location,
ValidWriteIdList writeIdList, Compa
}
StringBuilder extraDebugInfo = new
StringBuilder("[").append(obsoleteDirs.stream()
.map(Path::getName).collect(Collectors.joining(",")));
- return remove(location, ci, obsoleteDirs, true, fs, extraDebugInfo);
+ boolean success = remove(location, ci, obsoleteDirs, true, fs,
extraDebugInfo);
+ updateDeltaFilesMetrics(ci.dbname, ci.tableName, ci.partName,
dir.getObsolete().size());
Review comment:
Good idea!
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
##########
@@ -396,7 +423,9 @@ private boolean removeFiles(String location,
ValidWriteIdList writeIdList, Compa
}
StringBuilder extraDebugInfo = new
StringBuilder("[").append(obsoleteDirs.stream()
.map(Path::getName).collect(Collectors.joining(",")));
- return remove(location, ci, obsoleteDirs, true, fs, extraDebugInfo);
+ boolean success = remove(location, ci, obsoleteDirs, true, fs,
extraDebugInfo);
+ updateDeltaFilesMetrics(ci.dbname, ci.tableName, ci.partName,
dir.getObsolete().size());
Review comment:
The soft-drop partition is called when the partition was dropped before
the cleaner could clean it. Since the partition was already dropped, the
`TxnHandler.cleanupRecords` must have been called, which removes all the
records from the table that belongs to that particular partition.
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
##########
@@ -381,17 +397,19 @@ public CompactionType run() throws Exception {
}
}
- private CompactionType determineCompactionType(CompactionInfo ci,
ValidWriteIdList writeIds,
- StorageDescriptor sd,
Map<String, String> tblproperties)
+ private AcidDirectory getAcidDirectory(StorageDescriptor sd,ValidWriteIdList
writeIds) throws IOException {
Review comment:
fixed
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
##########
@@ -331,6 +339,12 @@ private boolean
foundCurrentOrFailedCompactions(ShowCompactResponse compactions,
}
return false;
}
+
+ private void updateDeltaFilesMetrics(AcidDirectory directory, String dbName,
String tableName, String partName) {
Review comment:
The two methods have different signatures.
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
##########
@@ -671,6 +679,13 @@ private String getWorkerId() {
return name.toString();
}
+ private void updateDeltaFilesMetrics(AcidDirectory directory, String dbName,
String tableName, String partName,
Review comment:
I will create a connection pool in a follow-up PR.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]