This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new a0ca3eadba5 [HUDI-6316] Adding corrupted and rollback log blocks
metrics (#8881)
a0ca3eadba5 is described below
commit a0ca3eadba5a041aaaf97d040adffc017cfdd285
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Sun Jul 9 23:10:43 2023 -0400
[HUDI-6316] Adding corrupted and rollback log blocks metrics (#8881)
Adding log block metrics to track corrupted lock blocks and rollback blocks.
Users need to enable `hoodie.metrics.compaction.log.blocks.on`
to enable the metrics.
---
.../org/apache/hudi/config/HoodieWriteConfig.java | 7 ++
.../hudi/config/metrics/HoodieMetricsConfig.java | 11 +++
.../org/apache/hudi/metrics/HoodieMetrics.java | 79 ++++++++++++++--------
.../org/apache/hudi/metrics/TestHoodieMetrics.java | 41 ++++++-----
.../hudi/common/model/HoodieCommitMetadata.java | 20 ++++++
5 files changed, 113 insertions(+), 45 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 93105491180..1390408d901 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -2088,6 +2088,13 @@ public class HoodieWriteConfig extends HoodieConfig {
return getBoolean(HoodieMetricsConfig.TURN_METRICS_ON);
}
+ /**
+ * metrics properties.
+ */
+ public boolean isCompactionLogBlockMetricsOn() {
+ return
getBoolean(HoodieMetricsConfig.TURN_METRICS_COMPACTION_LOG_BLOCKS_ON);
+ }
+
public boolean isExecutorMetricsEnabled() {
return Boolean.parseBoolean(
getStringOrDefault(HoodieMetricsConfig.EXECUTOR_METRICS_ENABLE,
"false"));
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsConfig.java
index 9fe9b33a546..e1d0afeb6fa 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsConfig.java
@@ -106,6 +106,12 @@ public class HoodieMetricsConfig extends HoodieConfig {
.sinceVersion("0.14.0")
.withDocumentation("Comma separated list of config file paths for metric
exporter configs");
+ public static final ConfigProperty<Boolean>
TURN_METRICS_COMPACTION_LOG_BLOCKS_ON = ConfigProperty
+ .key(METRIC_PREFIX + "compaction.log.blocks.on")
+ .defaultValue(false)
+ .sinceVersion("0.14.0")
+ .withDocumentation("Turn on/off metrics reporting for log blocks with
compaction commit. off by default.");
+
/**
* @deprecated Use {@link #TURN_METRICS_ON} and its methods instead
*/
@@ -171,6 +177,11 @@ public class HoodieMetricsConfig extends HoodieConfig {
return this;
}
+ public Builder compactionLogBlocksEnable(boolean
compactionLogBlockMetricsEnable) {
+ hoodieMetricsConfig.setValue(TURN_METRICS_COMPACTION_LOG_BLOCKS_ON,
String.valueOf(compactionLogBlockMetricsEnable));
+ return this;
+ }
+
public Builder withReporterType(String reporterType) {
hoodieMetricsConfig.setValue(METRICS_REPORTER_TYPE_VALUE, reporterType);
return this;
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
index dac680a5c40..792d0cd0844 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
@@ -37,6 +37,23 @@ public class HoodieMetrics {
private static final Logger LOG =
LoggerFactory.getLogger(HoodieMetrics.class);
+ public static final String TOTAL_PARTITIONS_WRITTEN_STR =
"totalPartitionsWritten";
+ public static final String TOTAL_FILES_INSERT_STR = "totalFilesInsert";
+ public static final String TOTAL_FILES_UPDATE_STR = "totalFilesUpdate";
+ public static final String TOTAL_RECORDS_WRITTEN_STR = "totalRecordsWritten";
+ public static final String TOTAL_UPDATE_RECORDS_WRITTEN_STR =
"totalUpdateRecordsWritten";
+ public static final String TOTAL_INSERT_RECORDS_WRITTEN_STR =
"totalInsertRecordsWritten";
+ public static final String TOTAL_BYTES_WRITTEN_STR = "totalBytesWritten";
+ public static final String TOTAL_SCAN_TIME_STR = "totalScanTime";
+ public static final String TOTAL_CREATE_TIME_STR = "totalCreateTime";
+ public static final String TOTAL_UPSERT_TIME_STR = "totalUpsertTime";
+ public static final String TOTAL_COMPACTED_RECORDS_UPDATED_STR =
"totalCompactedRecordsUpdated";
+ public static final String TOTAL_LOG_FILES_COMPACTED_STR =
"totalLogFilesCompacted";
+ public static final String TOTAL_LOG_FILES_SIZE_STR = "totalLogFilesSize";
+ public static final String TOTAL_RECORDS_DELETED = "totalRecordsDeleted";
+ public static final String TOTAL_CORRUPTED_LOG_BLOCKS_STR =
"totalCorruptedLogBlocks";
+ public static final String TOTAL_ROLLBACK_LOG_BLOCKS_STR =
"totalRollbackLogBlocks";
+
private Metrics metrics;
// Some timers
public String rollbackTimerName = null;
@@ -175,20 +192,20 @@ public class HoodieMetrics {
// No-op if metrics are not of type PROMETHEUS_PUSHGATEWAY.
return;
}
- metrics.registerGauge(getMetricsName(actionType,
"totalPartitionsWritten"), 0);
- metrics.registerGauge(getMetricsName(actionType, "totalFilesInsert"), 0);
- metrics.registerGauge(getMetricsName(actionType, "totalFilesUpdate"), 0);
- metrics.registerGauge(getMetricsName(actionType, "totalRecordsWritten"),
0);
- metrics.registerGauge(getMetricsName(actionType,
"totalUpdateRecordsWritten"), 0);
- metrics.registerGauge(getMetricsName(actionType,
"totalInsertRecordsWritten"), 0);
- metrics.registerGauge(getMetricsName(actionType, "totalRecordsDeleted"),
0);
- metrics.registerGauge(getMetricsName(actionType, "totalBytesWritten"), 0);
- metrics.registerGauge(getMetricsName(actionType, "totalScanTime"), 0);
- metrics.registerGauge(getMetricsName(actionType, "totalCreateTime"), 0);
- metrics.registerGauge(getMetricsName(actionType, "totalUpsertTime"), 0);
- metrics.registerGauge(getMetricsName(actionType,
"totalCompactedRecordsUpdated"), 0);
- metrics.registerGauge(getMetricsName(actionType,
"totalLogFilesCompacted"), 0);
- metrics.registerGauge(getMetricsName(actionType, "totalLogFilesSize"), 0);
+ metrics.registerGauge(getMetricsName(actionType,
TOTAL_PARTITIONS_WRITTEN_STR), 0);
+ metrics.registerGauge(getMetricsName(actionType, TOTAL_FILES_INSERT_STR),
0);
+ metrics.registerGauge(getMetricsName(actionType, TOTAL_FILES_UPDATE_STR),
0);
+ metrics.registerGauge(getMetricsName(actionType,
TOTAL_RECORDS_WRITTEN_STR), 0);
+ metrics.registerGauge(getMetricsName(actionType,
TOTAL_UPDATE_RECORDS_WRITTEN_STR), 0);
+ metrics.registerGauge(getMetricsName(actionType,
TOTAL_INSERT_RECORDS_WRITTEN_STR), 0);
+ metrics.registerGauge(getMetricsName(actionType, TOTAL_RECORDS_DELETED),
0);
+ metrics.registerGauge(getMetricsName(actionType, TOTAL_BYTES_WRITTEN_STR),
0);
+ metrics.registerGauge(getMetricsName(actionType, TOTAL_SCAN_TIME_STR), 0);
+ metrics.registerGauge(getMetricsName(actionType, TOTAL_CREATE_TIME_STR),
0);
+ metrics.registerGauge(getMetricsName(actionType, TOTAL_UPSERT_TIME_STR),
0);
+ metrics.registerGauge(getMetricsName(actionType,
TOTAL_COMPACTED_RECORDS_UPDATED_STR), 0);
+ metrics.registerGauge(getMetricsName(actionType,
TOTAL_LOG_FILES_COMPACTED_STR), 0);
+ metrics.registerGauge(getMetricsName(actionType,
TOTAL_LOG_FILES_SIZE_STR), 0);
}
public void updateCommitMetrics(long commitEpochTimeInMs, long durationInMs,
HoodieCommitMetadata metadata,
@@ -209,20 +226,26 @@ public class HoodieMetrics {
long totalCompactedRecordsUpdated =
metadata.getTotalCompactedRecordsUpdated();
long totalLogFilesCompacted = metadata.getTotalLogFilesCompacted();
long totalLogFilesSize = metadata.getTotalLogFilesSize();
- metrics.registerGauge(getMetricsName(actionType,
"totalPartitionsWritten"), totalPartitionsWritten);
- metrics.registerGauge(getMetricsName(actionType, "totalFilesInsert"),
totalFilesInsert);
- metrics.registerGauge(getMetricsName(actionType, "totalFilesUpdate"),
totalFilesUpdate);
- metrics.registerGauge(getMetricsName(actionType, "totalRecordsWritten"),
totalRecordsWritten);
- metrics.registerGauge(getMetricsName(actionType,
"totalUpdateRecordsWritten"), totalUpdateRecordsWritten);
- metrics.registerGauge(getMetricsName(actionType,
"totalInsertRecordsWritten"), totalInsertRecordsWritten);
- metrics.registerGauge(getMetricsName(actionType, "totalBytesWritten"),
totalBytesWritten);
- metrics.registerGauge(getMetricsName(actionType, "totalScanTime"),
totalTimeTakenByScanner);
- metrics.registerGauge(getMetricsName(actionType, "totalCreateTime"),
totalTimeTakenForInsert);
- metrics.registerGauge(getMetricsName(actionType, "totalUpsertTime"),
totalTimeTakenForUpsert);
- metrics.registerGauge(getMetricsName(actionType,
"totalCompactedRecordsUpdated"), totalCompactedRecordsUpdated);
- metrics.registerGauge(getMetricsName(actionType,
"totalLogFilesCompacted"), totalLogFilesCompacted);
- metrics.registerGauge(getMetricsName(actionType, "totalLogFilesSize"),
totalLogFilesSize);
- metrics.registerGauge(getMetricsName(actionType, "totalRecordsDeleted"),
totalRecordsDeleted);
+ metrics.registerGauge(getMetricsName(actionType,
TOTAL_PARTITIONS_WRITTEN_STR), totalPartitionsWritten);
+ metrics.registerGauge(getMetricsName(actionType,
TOTAL_FILES_INSERT_STR), totalFilesInsert);
+ metrics.registerGauge(getMetricsName(actionType,
TOTAL_FILES_UPDATE_STR), totalFilesUpdate);
+ metrics.registerGauge(getMetricsName(actionType,
TOTAL_RECORDS_WRITTEN_STR), totalRecordsWritten);
+ metrics.registerGauge(getMetricsName(actionType,
TOTAL_UPDATE_RECORDS_WRITTEN_STR), totalUpdateRecordsWritten);
+ metrics.registerGauge(getMetricsName(actionType,
TOTAL_INSERT_RECORDS_WRITTEN_STR), totalInsertRecordsWritten);
+ metrics.registerGauge(getMetricsName(actionType,
TOTAL_BYTES_WRITTEN_STR), totalBytesWritten);
+ metrics.registerGauge(getMetricsName(actionType, TOTAL_SCAN_TIME_STR),
totalTimeTakenByScanner);
+ metrics.registerGauge(getMetricsName(actionType, TOTAL_CREATE_TIME_STR),
totalTimeTakenForInsert);
+ metrics.registerGauge(getMetricsName(actionType, TOTAL_UPSERT_TIME_STR),
totalTimeTakenForUpsert);
+ metrics.registerGauge(getMetricsName(actionType,
TOTAL_COMPACTED_RECORDS_UPDATED_STR), totalCompactedRecordsUpdated);
+ metrics.registerGauge(getMetricsName(actionType,
TOTAL_LOG_FILES_COMPACTED_STR), totalLogFilesCompacted);
+ metrics.registerGauge(getMetricsName(actionType,
TOTAL_LOG_FILES_SIZE_STR), totalLogFilesSize);
+ metrics.registerGauge(getMetricsName(actionType, TOTAL_RECORDS_DELETED),
totalRecordsDeleted);
+ if (config.isCompactionLogBlockMetricsOn()) {
+ long totalCorruptedLogBlocks = metadata.getTotalCorruptLogBlocks();
+ long totalRollbackLogBlocks = metadata.getTotalRollbackLogBlocks();
+ metrics.registerGauge(getMetricsName(actionType,
TOTAL_CORRUPTED_LOG_BLOCKS_STR), totalCorruptedLogBlocks);
+ metrics.registerGauge(getMetricsName(actionType,
TOTAL_ROLLBACK_LOG_BLOCKS_STR), totalRollbackLogBlocks);
+ }
}
}
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java
index 1598810ce42..f305c9d1776 100755
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java
@@ -72,7 +72,6 @@ public class TestHoodieMetrics {
@Test
public void testTimerCtx() throws InterruptedException {
Random rand = new Random();
-
// Index metrics
Timer.Context timer = hoodieMetrics.getIndexCtx();
Thread.sleep(5); // Ensure timer duration is > 0
@@ -141,42 +140,50 @@ public class TestHoodieMetrics {
when(metadata.getTotalLogFilesCompacted()).thenReturn(randomValue + 12);
when(metadata.getTotalLogFilesSize()).thenReturn(randomValue + 13);
when(metadata.getTotalRecordsDeleted()).thenReturn(randomValue + 14);
+ when(metadata.getTotalCorruptLogBlocks()).thenReturn(randomValue + 15);
+ when(metadata.getTotalRollbackLogBlocks()).thenReturn(randomValue + 16);
when(metadata.getMinAndMaxEventTime()).thenReturn(Pair.of(Option.empty(),
Option.empty()));
- hoodieMetrics.updateCommitMetrics(randomValue + 15, commitTimer.stop(),
metadata, action);
+ when(config.isCompactionLogBlockMetricsOn()).thenReturn(true);
+
+ hoodieMetrics.updateCommitMetrics(randomValue + 17, commitTimer.stop(),
metadata, action);
String metricname = hoodieMetrics.getMetricsName(action, "duration");
long duration =
(Long)metrics.getRegistry().getGauges().get(metricname).getValue();
assertTrue(duration > 0);
- metricname = hoodieMetrics.getMetricsName(action,
"totalPartitionsWritten");
+ metricname = hoodieMetrics.getMetricsName(action,
HoodieMetrics.TOTAL_PARTITIONS_WRITTEN_STR);
assertEquals((long)metrics.getRegistry().getGauges().get(metricname).getValue(),
metadata.fetchTotalPartitionsWritten());
- metricname = hoodieMetrics.getMetricsName(action, "totalFilesInsert");
+ metricname = hoodieMetrics.getMetricsName(action,
HoodieMetrics.TOTAL_FILES_INSERT_STR);
assertEquals((long)metrics.getRegistry().getGauges().get(metricname).getValue(),
metadata.fetchTotalFilesInsert());
- metricname = hoodieMetrics.getMetricsName(action, "totalFilesUpdate");
+ metricname = hoodieMetrics.getMetricsName(action,
HoodieMetrics.TOTAL_FILES_UPDATE_STR);
assertEquals((long)metrics.getRegistry().getGauges().get(metricname).getValue(),
metadata.fetchTotalFilesUpdated());
- metricname = hoodieMetrics.getMetricsName(action, "totalRecordsWritten");
+ metricname = hoodieMetrics.getMetricsName(action,
HoodieMetrics.TOTAL_RECORDS_WRITTEN_STR);
assertEquals((long)metrics.getRegistry().getGauges().get(metricname).getValue(),
metadata.fetchTotalRecordsWritten());
- metricname = hoodieMetrics.getMetricsName(action,
"totalUpdateRecordsWritten");
+ metricname = hoodieMetrics.getMetricsName(action,
HoodieMetrics.TOTAL_UPDATE_RECORDS_WRITTEN_STR);
assertEquals((long)metrics.getRegistry().getGauges().get(metricname).getValue(),
metadata.fetchTotalUpdateRecordsWritten());
- metricname = hoodieMetrics.getMetricsName(action,
"totalInsertRecordsWritten");
+ metricname = hoodieMetrics.getMetricsName(action,
HoodieMetrics.TOTAL_INSERT_RECORDS_WRITTEN_STR);
assertEquals((long)metrics.getRegistry().getGauges().get(metricname).getValue(),
metadata.fetchTotalInsertRecordsWritten());
- metricname = hoodieMetrics.getMetricsName(action, "totalBytesWritten");
+ metricname = hoodieMetrics.getMetricsName(action,
HoodieMetrics.TOTAL_BYTES_WRITTEN_STR);
assertEquals((long)metrics.getRegistry().getGauges().get(metricname).getValue(),
metadata.fetchTotalBytesWritten());
metricname = hoodieMetrics.getMetricsName(action, "commitTime");
-
assertEquals((long)metrics.getRegistry().getGauges().get(metricname).getValue(),
randomValue + 15);
- metricname = hoodieMetrics.getMetricsName(action, "totalScanTime");
+
assertEquals((long)metrics.getRegistry().getGauges().get(metricname).getValue(),
randomValue + 17);
+ metricname = hoodieMetrics.getMetricsName(action,
HoodieMetrics.TOTAL_SCAN_TIME_STR);
assertEquals(metrics.getRegistry().getGauges().get(metricname).getValue(),
metadata.getTotalScanTime());
- metricname = hoodieMetrics.getMetricsName(action, "totalCreateTime");
+ metricname = hoodieMetrics.getMetricsName(action,
HoodieMetrics.TOTAL_CREATE_TIME_STR);
assertEquals(metrics.getRegistry().getGauges().get(metricname).getValue(),
metadata.getTotalCreateTime());
- metricname = hoodieMetrics.getMetricsName(action, "totalUpsertTime");
+ metricname = hoodieMetrics.getMetricsName(action,
HoodieMetrics.TOTAL_UPSERT_TIME_STR);
assertEquals(metrics.getRegistry().getGauges().get(metricname).getValue(),
metadata.getTotalUpsertTime());
- metricname = hoodieMetrics.getMetricsName(action,
"totalCompactedRecordsUpdated");
+ metricname = hoodieMetrics.getMetricsName(action,
HoodieMetrics.TOTAL_COMPACTED_RECORDS_UPDATED_STR);
assertEquals(metrics.getRegistry().getGauges().get(metricname).getValue(),
metadata.getTotalCompactedRecordsUpdated());
- metricname = hoodieMetrics.getMetricsName(action,
"totalLogFilesCompacted");
+ metricname = hoodieMetrics.getMetricsName(action,
HoodieMetrics.TOTAL_LOG_FILES_COMPACTED_STR);
assertEquals(metrics.getRegistry().getGauges().get(metricname).getValue(),
metadata.getTotalLogFilesCompacted());
- metricname = hoodieMetrics.getMetricsName(action, "totalLogFilesSize");
+ metricname = hoodieMetrics.getMetricsName(action,
HoodieMetrics.TOTAL_LOG_FILES_SIZE_STR);
assertEquals(metrics.getRegistry().getGauges().get(metricname).getValue(),
metadata.getTotalLogFilesSize());
- metricname = hoodieMetrics.getMetricsName(action, "totalRecordsDeleted");
+ metricname = hoodieMetrics.getMetricsName(action,
HoodieMetrics.TOTAL_RECORDS_DELETED);
assertEquals(metrics.getRegistry().getGauges().get(metricname).getValue(),
metadata.getTotalRecordsDeleted());
+ metricname = hoodieMetrics.getMetricsName(action,
HoodieMetrics.TOTAL_CORRUPTED_LOG_BLOCKS_STR);
+
assertEquals(metrics.getRegistry().getGauges().get(metricname).getValue(),
metadata.getTotalCorruptLogBlocks());
+ metricname = hoodieMetrics.getMetricsName(action,
HoodieMetrics.TOTAL_ROLLBACK_LOG_BLOCKS_STR);
+
assertEquals(metrics.getRegistry().getGauges().get(metricname).getValue(),
metadata.getTotalRollbackLogBlocks());
});
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java
index 1ed8ff241dc..795e6cfe7a6 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java
@@ -398,6 +398,26 @@ public class HoodieCommitMetadata implements Serializable {
return totalUpdateRecords;
}
+ public Long getTotalCorruptLogBlocks() {
+ Long totalCorruptedLogBlocks = 0L;
+ for (Map.Entry<String, List<HoodieWriteStat>> entry :
partitionToWriteStats.entrySet()) {
+ for (HoodieWriteStat writeStat : entry.getValue()) {
+ totalCorruptedLogBlocks += writeStat.getTotalCorruptLogBlock();
+ }
+ }
+ return totalCorruptedLogBlocks;
+ }
+
+ public Long getTotalRollbackLogBlocks() {
+ Long totalRollbackLogBlocks = 0L;
+ for (Map.Entry<String, List<HoodieWriteStat>> entry :
partitionToWriteStats.entrySet()) {
+ for (HoodieWriteStat writeStat : entry.getValue()) {
+ totalRollbackLogBlocks += writeStat.getTotalRollbackBlocks();
+ }
+ }
+ return totalRollbackLogBlocks;
+ }
+
public Long getTotalLogFilesSize() {
Long totalLogFilesSize = 0L;
for (Map.Entry<String, List<HoodieWriteStat>> entry :
partitionToWriteStats.entrySet()) {