This is an automated email from the ASF dual-hosted git repository.
zhangyue19921010 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 422f510ebd4 [HUDI-8882] Support Compaction/Rollback/Clean Timeline
Instant Metrics in HoodieMetrics (#12681)
422f510ebd4 is described below
commit 422f510ebd41701a34a9f9e83787f6b9d021690e
Author: fhan <[email protected]>
AuthorDate: Wed Feb 19 16:58:58 2025 +0800
[HUDI-8882] Support Compaction/Rollback/Clean Timeline Instant Metrics in
HoodieMetrics (#12681)
* [HUDI-8882] timeline instant metrics for
clean&compaction&clustering&rollback
* update method name & add doc & change to switch type in
updateLatestCompletedInstant
---------
Co-authored-by: fhan <[email protected]>
---
.../apache/hudi/client/BaseHoodieWriteClient.java | 2 +-
.../org/apache/hudi/metrics/HoodieMetrics.java | 114 +++++++++++++++++----
.../org/apache/hudi/metrics/TestHoodieMetrics.java | 74 ++++++++++---
3 files changed, 156 insertions(+), 34 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index 4b3f90c43f3..b8069bf4735 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -564,7 +564,7 @@ public abstract class BaseHoodieWriteClient<T, I, K, O>
extends BaseHoodieClient
// Delete the marker directory for the instant.
WriteMarkersFactory.get(config.getMarkersType(), table, instantTime)
.quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
-
metrics.updateClusteringTimeLineInstantMetrics(table.getActiveTimeline());
+ metrics.updateTableServiceInstantMetrics(table.getActiveTimeline());
} finally {
this.heartbeatClient.stop(instantTime);
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
index 104cbfed9c5..8f316782dcd 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
@@ -22,6 +22,7 @@ import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.VisibleForTesting;
@@ -34,6 +35,8 @@ import com.codahale.metrics.Timer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Set;
+
/**
* Wrapper for metrics-related operations.
*/
@@ -65,9 +68,18 @@ public class HoodieMetrics {
public static final String COMMIT_LATENCY_IN_MS_STR = "commitLatencyInMs";
public static final String COMMIT_FRESHNESS_IN_MS_STR =
"commitFreshnessInMs";
public static final String COMMIT_TIME_STR = "commitTime";
- public static final String EARLIEST_INFLIGHT_CLUSTERING_INSTANT_STR =
"earliestInflightClusteringInstant";
+ public static final String EARLIEST_PENDING_CLUSTERING_INSTANT_STR =
"earliestInflightClusteringInstant";
+ public static final String EARLIEST_PENDING_COMPACTION_INSTANT_STR =
"earliestInflightCompactionInstant";
+ public static final String EARLIEST_PENDING_CLEAN_INSTANT_STR =
"earliestInflightCleanInstant";
+ public static final String EARLIEST_PENDING_ROLLBACK_INSTANT_STR =
"earliestInflightRollbackInstant";
public static final String LATEST_COMPLETED_CLUSTERING_INSTANT_STR =
"latestCompletedClusteringInstant";
+ public static final String LATEST_COMPLETED_COMPACTION_INSTANT_STR =
"latestCompletedCompactionInstant";
+ public static final String LATEST_COMPLETED_CLEAN_INSTANT_STR =
"latestCompletedCleanInstant";
+ public static final String LATEST_COMPLETED_ROLLBACK_INSTANT_STR =
"latestCompletedRollbackInstant";
public static final String PENDING_CLUSTERING_INSTANT_COUNT_STR =
"pendingClusteringInstantCount";
+ public static final String PENDING_COMPACTION_INSTANT_COUNT_STR =
"pendingCompactionInstantCount";
+ public static final String PENDING_CLEAN_INSTANT_COUNT_STR =
"pendingCleanInstantCount";
+ public static final String PENDING_ROLLBACK_INSTANT_COUNT_STR =
"pendingRollbackInstantCount";
public static final String SUCCESS_EXTENSION = ".success";
public static final String FAILURE_EXTENSION = ".failure";
@@ -383,27 +395,89 @@ public class HoodieMetrics {
reportMetrics(HoodieTimeline.CLUSTERING_ACTION, "fileCreationTime",
durationInMs);
}
- public void updateClusteringTimeLineInstantMetrics(final
HoodieActiveTimeline activeTimeline) {
- if (config.isMetricsOn()) {
- // Compute Metrics
- long pendingClusteringInstantCount =
activeTimeline.filterPendingClusteringTimeline().getInstants().size();
- long earliestInflightClusteringInstantLong = 0L;
- Option<HoodieInstant> earliestInflightClusteringInstant =
activeTimeline.filterPendingClusteringTimeline().getFirstPendingClusterInstant();
- if (earliestInflightClusteringInstant.isPresent()) {
- earliestInflightClusteringInstantLong =
Long.valueOf(earliestInflightClusteringInstant.get().requestedTime());
- }
- long latestCompletedClusteringInstantLong = 0L;
- Option<HoodieInstant> latestCompletedClusteringInstant =
activeTimeline.filterCompletedInstants().getLastClusteringInstant();
- if (latestCompletedClusteringInstant.isPresent()) {
- latestCompletedClusteringInstantLong =
Long.valueOf(latestCompletedClusteringInstant.get().requestedTime());
- }
+ public void updateTableServiceInstantMetrics(final HoodieActiveTimeline
activeTimeline) {
+ updateEarliestPendingInstant(activeTimeline,
EARLIEST_PENDING_CLUSTERING_INSTANT_STR, HoodieTimeline.CLUSTERING_ACTION);
+ updateEarliestPendingInstant(activeTimeline,
EARLIEST_PENDING_COMPACTION_INSTANT_STR, HoodieTimeline.COMPACTION_ACTION);
+ updateEarliestPendingInstant(activeTimeline,
EARLIEST_PENDING_CLEAN_INSTANT_STR, HoodieTimeline.CLEAN_ACTION);
+ updateEarliestPendingInstant(activeTimeline,
EARLIEST_PENDING_ROLLBACK_INSTANT_STR, HoodieTimeline.ROLLBACK_ACTION);
+
+ updateLatestCompletedInstant(activeTimeline,
LATEST_COMPLETED_CLUSTERING_INSTANT_STR, HoodieTimeline.REPLACE_COMMIT_ACTION);
+ updateLatestCompletedInstant(activeTimeline,
LATEST_COMPLETED_COMPACTION_INSTANT_STR, HoodieTimeline.COMMIT_ACTION);
+ updateLatestCompletedInstant(activeTimeline,
LATEST_COMPLETED_CLEAN_INSTANT_STR, HoodieTimeline.CLEAN_ACTION);
+ updateLatestCompletedInstant(activeTimeline,
LATEST_COMPLETED_ROLLBACK_INSTANT_STR, HoodieTimeline.ROLLBACK_ACTION);
+
+ updatePendingInstantCount(activeTimeline,
PENDING_CLUSTERING_INSTANT_COUNT_STR, HoodieTimeline.CLUSTERING_ACTION);
+ updatePendingInstantCount(activeTimeline,
PENDING_COMPACTION_INSTANT_COUNT_STR, HoodieTimeline.COMPACTION_ACTION);
+ updatePendingInstantCount(activeTimeline, PENDING_CLEAN_INSTANT_COUNT_STR,
HoodieTimeline.CLEAN_ACTION);
+ updatePendingInstantCount(activeTimeline,
PENDING_ROLLBACK_INSTANT_COUNT_STR, HoodieTimeline.ROLLBACK_ACTION);
+ }
+
+ /**
+ * Use EarliestPendingInstant to judge which instant execution plan is the
current table service blocked in.
+ *
+ * @param activeTimeline
+ * @param metricName
+ * @param action
+ */
+ private void updateEarliestPendingInstant(final HoodieActiveTimeline
activeTimeline,
+ final String metricName,
+ final String action) {
+ Set<String> validActions = CollectionUtils.createSet(action);
+ HoodieTimeline filteredInstants =
activeTimeline.filterInflightsAndRequested().filter(instant ->
validActions.contains(instant.getAction()));
+ Option<HoodieInstant> hoodieInstantOption =
filteredInstants.firstInstant();
+ if (hoodieInstantOption.isPresent()) {
+ updateMetric(action, metricName,
Long.parseLong(hoodieInstantOption.get().requestedTime()));
+ }
+ }
+
+ /**
+ * Use LatestCompletedInstant to observe the latest execution progress of
the table service.
+ *
+ * @param activeTimeline
+ * @param metricName
+ * @param action
+ */
+ private void updateLatestCompletedInstant(final HoodieActiveTimeline
activeTimeline,
+ final String metricName,
+ String action) {
+ switch (metricName) {
+ case LATEST_COMPLETED_COMPACTION_INSTANT_STR:
+ action = HoodieActiveTimeline.COMMIT_ACTION;
+ break;
+ case LATEST_COMPLETED_CLUSTERING_INSTANT_STR:
+ action = HoodieActiveTimeline.REPLACE_COMMIT_ACTION;
+ break;
+ default:
+ // do nothing
+ }
+ Set<String> validActions = CollectionUtils.createSet(action);
+ HoodieTimeline filteredInstants =
activeTimeline.filterCompletedInstants().filter(instant ->
validActions.contains(instant.getAction()));
+ Option<HoodieInstant> hoodieInstantOption = filteredInstants.lastInstant();
+ if (hoodieInstantOption.isPresent()) {
+ updateMetric(action, metricName,
Long.parseLong(hoodieInstantOption.get().requestedTime()));
+ }
+ }
+ /**
+ * Use PendingInstantCount to judge how many execution plans are waiting to
be executed.
+ *
+ * @param activeTimeline
+ * @param metricName
+ * @param action
+ */
+ private void updatePendingInstantCount(final HoodieActiveTimeline
activeTimeline,
+ final String metricName,
+ final String action) {
+ Set<String> validActions = CollectionUtils.createSet(action);
+ HoodieTimeline filteredInstants =
activeTimeline.filterInflightsAndRequested().filter(instant ->
validActions.contains(instant.getAction()));
+ updateMetric(action, metricName, filteredInstants.countInstants());
+ }
+
+ private void updateMetric(final String action, final String metricName,
final long metricValue) {
+ if (config.isMetricsOn()) {
LOG.info(
- String.format("Sending timeline clustering instant metrics (%s=%d,
%s=%d, %s=%d)", EARLIEST_INFLIGHT_CLUSTERING_INSTANT_STR,
earliestInflightClusteringInstantLong,
- LATEST_COMPLETED_CLUSTERING_INSTANT_STR,
latestCompletedClusteringInstantLong, PENDING_CLUSTERING_INSTANT_COUNT_STR,
pendingClusteringInstantCount));
- metrics.registerGauge(getMetricsName(HoodieTimeline.CLUSTERING_ACTION,
EARLIEST_INFLIGHT_CLUSTERING_INSTANT_STR),
earliestInflightClusteringInstantLong);
- metrics.registerGauge(getMetricsName(HoodieTimeline.CLUSTERING_ACTION,
LATEST_COMPLETED_CLUSTERING_INSTANT_STR), latestCompletedClusteringInstantLong);
- metrics.registerGauge(getMetricsName(HoodieTimeline.CLUSTERING_ACTION,
PENDING_CLUSTERING_INSTANT_COUNT_STR), pendingClusteringInstantCount);
+ String.format("Updating timeline instant related metrics (%s=%d)",
metricName, metricValue));
+ metrics.registerGauge(getMetricsName(action, metricName), metricValue);
}
}
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java
index 7da3c9479b7..862c9d8d7f1 100755
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java
@@ -242,23 +242,71 @@ public class TestHoodieMetrics {
assertEquals(metrics.getRegistry().getGauges().get(metricname).getValue(),
metadata.getTotalRollbackLogBlocks());
});
- // Clustering Timeline Instant Metrics
+ // MOCK Timeline Instant Metrics for Clean & Rollback
+ HoodieInstant instant004 =
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.REQUESTED,
HoodieTimeline.CLEAN_ACTION, "1004");
+ HoodieInstant instant007 =
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.CLEAN_ACTION, "1007");
+ HoodieInstant instant009 =
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.REQUESTED,
HoodieTimeline.ROLLBACK_ACTION, "1009");
+ HoodieInstant instant0010 =
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.REQUESTED,
HoodieTimeline.CLEAN_ACTION, "10010");
+ HoodieInstant instant0013 =
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.ROLLBACK_ACTION, "10013");
+ HoodieInstant instant0015 =
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.REQUESTED,
HoodieTimeline.CLEAN_ACTION, "10015");
+ HoodieInstant instant0016 =
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.REQUESTED,
HoodieTimeline.ROLLBACK_ACTION, "10016");
+ HoodieInstant instant0017 =
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.ROLLBACK_ACTION, "10017");
+
+ HoodieActiveTimeline activeTimeline1 = new
MockHoodieActiveTimeline(instant004, instant007, instant009, instant0010,
instant0013, instant0015, instant0016, instant0017);
+ hoodieMetrics.updateTableServiceInstantMetrics(activeTimeline1);
+
+ metricName = hoodieMetrics.getMetricsName(HoodieTimeline.CLEAN_ACTION,
HoodieMetrics.EARLIEST_PENDING_CLEAN_INSTANT_STR);
+
assertEquals((long)metrics.getRegistry().getGauges().get(metricName).getValue(),
Long.valueOf("1004"));
+ metricName = hoodieMetrics.getMetricsName(HoodieTimeline.ROLLBACK_ACTION,
HoodieMetrics.EARLIEST_PENDING_ROLLBACK_INSTANT_STR);
+
assertEquals((long)metrics.getRegistry().getGauges().get(metricName).getValue(),
Long.valueOf("1009"));
+
+ metricName = hoodieMetrics.getMetricsName(HoodieTimeline.CLEAN_ACTION,
HoodieMetrics.LATEST_COMPLETED_CLEAN_INSTANT_STR);
+
assertEquals((long)metrics.getRegistry().getGauges().get(metricName).getValue(),
Long.valueOf("1007"));
+ metricName = hoodieMetrics.getMetricsName(HoodieTimeline.ROLLBACK_ACTION,
HoodieMetrics.LATEST_COMPLETED_ROLLBACK_INSTANT_STR);
+
assertEquals((long)metrics.getRegistry().getGauges().get(metricName).getValue(),
Long.valueOf("10017"));
+
+ metricName = hoodieMetrics.getMetricsName(HoodieTimeline.CLEAN_ACTION,
HoodieMetrics.PENDING_CLEAN_INSTANT_COUNT_STR);
+
assertEquals((long)metrics.getRegistry().getGauges().get(metricName).getValue(),
3L);
+ metricName = hoodieMetrics.getMetricsName(HoodieTimeline.ROLLBACK_ACTION,
HoodieMetrics.PENDING_ROLLBACK_INSTANT_COUNT_STR);
+
assertEquals((long)metrics.getRegistry().getGauges().get(metricName).getValue(),
2L);
+
+ // MOCK Timeline Instant Metrics for Clustering
HoodieInstant instant001 =
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.CLUSTERING_ACTION, "1001");
- HoodieInstant instant0011 =
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT,
HoodieTimeline.COMPACTION_ACTION, "11001");
- HoodieInstant instant002 =
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT,
HoodieTimeline.CLUSTERING_ACTION, "1002");
- HoodieInstant instant003 =
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.REQUESTED,
HoodieTimeline.CLUSTERING_ACTION, "1003");
- HoodieInstant instant004 =
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.REQUESTED,
HoodieTimeline.CLUSTERING_ACTION, "1004");
HoodieInstant instant005 =
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.CLUSTERING_ACTION, "1005");
- HoodieInstant instant006 =
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.REQUESTED,
HoodieTimeline.CLUSTERING_ACTION, "1006");
- HoodieActiveTimeline activeTimeline = new
MockHoodieActiveTimeline(instant001, instant0011, instant002, instant003,
instant004, instant005, instant006);
- hoodieMetrics.updateClusteringTimeLineInstantMetrics(activeTimeline);
+ HoodieInstant instant0011 =
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.REPLACE_COMMIT_ACTION, "10011");
+ HoodieInstant instant0018 =
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.REQUESTED,
HoodieTimeline.CLUSTERING_ACTION, "10018");
+
+ HoodieActiveTimeline activeTimeline2 = new
MockHoodieActiveTimeline(instant001, instant005, instant0011, instant0018);
+ hoodieMetrics.updateTableServiceInstantMetrics(activeTimeline2);
+
+ metricName =
hoodieMetrics.getMetricsName(HoodieTimeline.CLUSTERING_ACTION,
HoodieMetrics.EARLIEST_PENDING_CLUSTERING_INSTANT_STR);
+
assertEquals((long)metrics.getRegistry().getGauges().get(metricName).getValue(),
Long.valueOf("10018"));
+
+ metricName =
hoodieMetrics.getMetricsName(HoodieTimeline.REPLACE_COMMIT_ACTION,
HoodieMetrics.LATEST_COMPLETED_CLUSTERING_INSTANT_STR);
+
assertEquals((long)metrics.getRegistry().getGauges().get(metricName).getValue(),
Long.valueOf("10011"));
- metricName =
hoodieMetrics.getMetricsName(HoodieTimeline.CLUSTERING_ACTION,
HoodieMetrics.EARLIEST_INFLIGHT_CLUSTERING_INSTANT_STR);
-
assertEquals((long)metrics.getRegistry().getGauges().get(metricName).getValue(),
Long.valueOf("1002"));
- metricName =
hoodieMetrics.getMetricsName(HoodieTimeline.CLUSTERING_ACTION,
HoodieMetrics.LATEST_COMPLETED_CLUSTERING_INSTANT_STR);
-
assertEquals((long)metrics.getRegistry().getGauges().get(metricName).getValue(),
Long.valueOf("1005"));
metricName =
hoodieMetrics.getMetricsName(HoodieTimeline.CLUSTERING_ACTION,
HoodieMetrics.PENDING_CLUSTERING_INSTANT_COUNT_STR);
-
assertEquals((long)metrics.getRegistry().getGauges().get(metricName).getValue(),
4L);
+
assertEquals((long)metrics.getRegistry().getGauges().get(metricName).getValue(),
1L);
+
+ // MOCK Timeline Instant Metrics for Compaction
+ HoodieInstant instant002 =
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT,
HoodieTimeline.COMPACTION_ACTION, "1002");
+ HoodieInstant instant003 =
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.REQUESTED,
HoodieTimeline.COMPACTION_ACTION, "1003");
+ HoodieInstant instant006 =
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.COMMIT_ACTION, "1006");
+ HoodieInstant instant008 =
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT,
HoodieTimeline.COMPACTION_ACTION, "1008");
+ HoodieInstant instant0012 =
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.REQUESTED,
HoodieTimeline.COMPACTION_ACTION, "10012");
+ HoodieInstant instant0014 =
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT,
HoodieTimeline.COMPACTION_ACTION, "10014");
+
+ HoodieActiveTimeline activeTimeline3 = new
MockHoodieActiveTimeline(instant002, instant003, instant006, instant008,
instant0012, instant0014);
+ hoodieMetrics.updateTableServiceInstantMetrics(activeTimeline3);
+
+ metricName =
hoodieMetrics.getMetricsName(HoodieTimeline.COMPACTION_ACTION,
HoodieMetrics.EARLIEST_PENDING_COMPACTION_INSTANT_STR);
+
assertEquals((long)metrics.getRegistry().getGauges().get(metricName).getValue(),
Long.valueOf("1002"));
+
+ metricName = hoodieMetrics.getMetricsName(HoodieTimeline.COMMIT_ACTION,
HoodieMetrics.LATEST_COMPLETED_COMPACTION_INSTANT_STR);
+
assertEquals((long)metrics.getRegistry().getGauges().get(metricName).getValue(),
Long.valueOf("1006"));
+
+ metricName =
hoodieMetrics.getMetricsName(HoodieTimeline.COMPACTION_ACTION,
HoodieMetrics.PENDING_COMPACTION_INSTANT_COUNT_STR);
+
assertEquals((long)metrics.getRegistry().getGauges().get(metricName).getValue(),
5L);
}
private static class MockHoodieActiveTimeline extends ActiveTimelineV2 {