zhangyue19921010 commented on code in PR #12681:
URL: https://github.com/apache/hudi/pull/12681#discussion_r1927999273
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java:
##########
@@ -565,7 +565,7 @@ protected void postCommit(HoodieTable table,
HoodieCommitMetadata metadata, Stri
// Delete the marker directory for the instant.
WriteMarkersFactory.get(config.getMarkersType(), table, instantTime)
.quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
-
metrics.updateClusteringTimeLineInstantMetrics(table.getActiveTimeline());
+ metrics.updateTimelineInstantMetrics(table.getActiveTimeline());
Review Comment:
could we rename this function to `updateTableServiceInstantMetrics`
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java:
##########
@@ -383,27 +396,67 @@ public void updateClusteringFileCreationMetrics(long
durationInMs) {
reportMetrics(HoodieTimeline.CLUSTERING_ACTION, "fileCreationTime",
durationInMs);
}
- public void updateClusteringTimeLineInstantMetrics(final
HoodieActiveTimeline activeTimeline) {
- if (config.isMetricsOn()) {
- // Compute Metrics
- long pendingClusteringInstantCount =
activeTimeline.filterPendingClusteringTimeline().getInstants().size();
- long earliestInflightClusteringInstantLong = 0L;
- Option<HoodieInstant> earliestInflightClusteringInstant =
activeTimeline.filterPendingClusteringTimeline().getFirstPendingClusterInstant();
- if (earliestInflightClusteringInstant.isPresent()) {
- earliestInflightClusteringInstantLong =
Long.valueOf(earliestInflightClusteringInstant.get().requestedTime());
- }
- long latestCompletedClusteringInstantLong = 0L;
- Option<HoodieInstant> latestCompletedClusteringInstant =
activeTimeline.filterCompletedInstants().getLastClusteringInstant();
- if (latestCompletedClusteringInstant.isPresent()) {
- latestCompletedClusteringInstantLong =
Long.valueOf(latestCompletedClusteringInstant.get().requestedTime());
- }
+ public void updateTimelineInstantMetrics(final HoodieActiveTimeline
activeTimeline) {
+ updateEarliestPendingInstant(activeTimeline,
EARLIEST_PENDING_CLUSTERING_INSTANT_STR, HoodieTimeline.CLUSTERING_ACTION);
+ updateEarliestPendingInstant(activeTimeline,
EARLIEST_PENDING_COMPACTION_INSTANT_STR, HoodieTimeline.COMPACTION_ACTION);
+ updateEarliestPendingInstant(activeTimeline,
EARLIEST_PENDING_CLEAN_INSTANT_STR, HoodieTimeline.CLEAN_ACTION);
+ updateEarliestPendingInstant(activeTimeline,
EARLIEST_PENDING_ROLLBACK_INSTANT_STR, HoodieTimeline.ROLLBACK_ACTION);
+
+ updateLatestCompletedInstant(activeTimeline,
LATEST_COMPLETED_CLUSTERING_INSTANT_STR, HoodieTimeline.REPLACE_COMMIT_ACTION);
+ updateLatestCompletedInstant(activeTimeline,
LATEST_COMPLETED_COMPACTION_INSTANT_STR, HoodieTimeline.COMMIT_ACTION);
+ updateLatestCompletedInstant(activeTimeline,
LATEST_COMPLETED_CLEAN_INSTANT_STR, HoodieTimeline.CLEAN_ACTION);
+ updateLatestCompletedInstant(activeTimeline,
LATEST_COMPLETED_ROLLBACK_INSTANT_STR, HoodieTimeline.ROLLBACK_ACTION);
+
+ updatePendingInstantCount(activeTimeline,
PENDING_CLUSTERING_INSTANT_COUNT_STR, HoodieTimeline.CLUSTERING_ACTION);
+ updatePendingInstantCount(activeTimeline,
PENDING_COMPACTION_INSTANT_COUNT_STR, HoodieTimeline.COMPACTION_ACTION);
+ updatePendingInstantCount(activeTimeline, PENDING_CLEAN_INSTANT_COUNT_STR,
HoodieTimeline.CLEAN_ACTION);
+ updatePendingInstantCount(activeTimeline,
PENDING_ROLLBACK_INSTANT_COUNT_STR, HoodieTimeline.ROLLBACK_ACTION);
+ }
+
+ private void updateEarliestPendingInstant(final HoodieActiveTimeline
activeTimeline,
+ final String metricName,
+ final String action) {
+ Set<String> validActions = CollectionUtils.createSet(action);
+ HoodieTimeline filteredInstants =
activeTimeline.filterInflightsAndRequested().filter(instant ->
validActions.contains(instant.getAction()));
+ Option<HoodieInstant> hoodieInstantOption =
filteredInstants.firstInstant();
+ if (hoodieInstantOption.isPresent()) {
+ updateMetric(action, metricName,
Long.parseLong(hoodieInstantOption.get().requestedTime()));
+ }
+ }
+ private void updateLatestCompletedInstant(final HoodieActiveTimeline
activeTimeline,
+ final String metricName,
+ String action) {
+ String tableType = config.getTableType().name();
+ if (HoodieTableType.MERGE_ON_READ.name().equalsIgnoreCase(tableType)
+ &&
LATEST_COMPLETED_COMPACTION_INSTANT_STR.equalsIgnoreCase(metricName)) {
+ action = HoodieActiveTimeline.COMMIT_ACTION;
+ }
+ if (HoodieTableType.COPY_ON_WRITE.name().equalsIgnoreCase(tableType)
+ &&
LATEST_COMPLETED_CLUSTERING_INSTANT_STR.equalsIgnoreCase(metricName)) {
+ action = HoodieActiveTimeline.REPLACE_COMMIT_ACTION;
+ }
+ Set<String> validActions = CollectionUtils.createSet(action);
+ HoodieTimeline filteredInstants =
activeTimeline.filterCompletedInstants().filter(instant ->
validActions.contains(instant.getAction()));
+ Option<HoodieInstant> hoodieInstantOption = filteredInstants.lastInstant();
+ if (hoodieInstantOption.isPresent()) {
+ updateMetric(action, metricName,
Long.parseLong(hoodieInstantOption.get().requestedTime()));
+ }
+ }
+
+ private void updatePendingInstantCount(final HoodieActiveTimeline
activeTimeline,
Review Comment:
could we add some java docs? what does `PendingInstantCount` mean? and what
is this metric used for?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java:
##########
@@ -383,27 +396,67 @@ public void updateClusteringFileCreationMetrics(long
durationInMs) {
reportMetrics(HoodieTimeline.CLUSTERING_ACTION, "fileCreationTime",
durationInMs);
}
- public void updateClusteringTimeLineInstantMetrics(final
HoodieActiveTimeline activeTimeline) {
- if (config.isMetricsOn()) {
- // Compute Metrics
- long pendingClusteringInstantCount =
activeTimeline.filterPendingClusteringTimeline().getInstants().size();
- long earliestInflightClusteringInstantLong = 0L;
- Option<HoodieInstant> earliestInflightClusteringInstant =
activeTimeline.filterPendingClusteringTimeline().getFirstPendingClusterInstant();
- if (earliestInflightClusteringInstant.isPresent()) {
- earliestInflightClusteringInstantLong =
Long.valueOf(earliestInflightClusteringInstant.get().requestedTime());
- }
- long latestCompletedClusteringInstantLong = 0L;
- Option<HoodieInstant> latestCompletedClusteringInstant =
activeTimeline.filterCompletedInstants().getLastClusteringInstant();
- if (latestCompletedClusteringInstant.isPresent()) {
- latestCompletedClusteringInstantLong =
Long.valueOf(latestCompletedClusteringInstant.get().requestedTime());
- }
+ public void updateTimelineInstantMetrics(final HoodieActiveTimeline
activeTimeline) {
+ updateEarliestPendingInstant(activeTimeline,
EARLIEST_PENDING_CLUSTERING_INSTANT_STR, HoodieTimeline.CLUSTERING_ACTION);
+ updateEarliestPendingInstant(activeTimeline,
EARLIEST_PENDING_COMPACTION_INSTANT_STR, HoodieTimeline.COMPACTION_ACTION);
+ updateEarliestPendingInstant(activeTimeline,
EARLIEST_PENDING_CLEAN_INSTANT_STR, HoodieTimeline.CLEAN_ACTION);
+ updateEarliestPendingInstant(activeTimeline,
EARLIEST_PENDING_ROLLBACK_INSTANT_STR, HoodieTimeline.ROLLBACK_ACTION);
+
+ updateLatestCompletedInstant(activeTimeline,
LATEST_COMPLETED_CLUSTERING_INSTANT_STR, HoodieTimeline.REPLACE_COMMIT_ACTION);
+ updateLatestCompletedInstant(activeTimeline,
LATEST_COMPLETED_COMPACTION_INSTANT_STR, HoodieTimeline.COMMIT_ACTION);
+ updateLatestCompletedInstant(activeTimeline,
LATEST_COMPLETED_CLEAN_INSTANT_STR, HoodieTimeline.CLEAN_ACTION);
+ updateLatestCompletedInstant(activeTimeline,
LATEST_COMPLETED_ROLLBACK_INSTANT_STR, HoodieTimeline.ROLLBACK_ACTION);
+
+ updatePendingInstantCount(activeTimeline,
PENDING_CLUSTERING_INSTANT_COUNT_STR, HoodieTimeline.CLUSTERING_ACTION);
+ updatePendingInstantCount(activeTimeline,
PENDING_COMPACTION_INSTANT_COUNT_STR, HoodieTimeline.COMPACTION_ACTION);
+ updatePendingInstantCount(activeTimeline, PENDING_CLEAN_INSTANT_COUNT_STR,
HoodieTimeline.CLEAN_ACTION);
+ updatePendingInstantCount(activeTimeline,
PENDING_ROLLBACK_INSTANT_COUNT_STR, HoodieTimeline.ROLLBACK_ACTION);
+ }
+
+ private void updateEarliestPendingInstant(final HoodieActiveTimeline
activeTimeline,
+ final String metricName,
+ final String action) {
+ Set<String> validActions = CollectionUtils.createSet(action);
+ HoodieTimeline filteredInstants =
activeTimeline.filterInflightsAndRequested().filter(instant ->
validActions.contains(instant.getAction()));
+ Option<HoodieInstant> hoodieInstantOption =
filteredInstants.firstInstant();
+ if (hoodieInstantOption.isPresent()) {
+ updateMetric(action, metricName,
Long.parseLong(hoodieInstantOption.get().requestedTime()));
+ }
+ }
+ private void updateLatestCompletedInstant(final HoodieActiveTimeline
activeTimeline,
+ final String metricName,
+ String action) {
+ String tableType = config.getTableType().name();
+ if (HoodieTableType.MERGE_ON_READ.name().equalsIgnoreCase(tableType)
+ &&
LATEST_COMPLETED_COMPACTION_INSTANT_STR.equalsIgnoreCase(metricName)) {
+ action = HoodieActiveTimeline.COMMIT_ACTION;
+ }
+ if (HoodieTableType.COPY_ON_WRITE.name().equalsIgnoreCase(tableType)
+ &&
LATEST_COMPLETED_CLUSTERING_INSTANT_STR.equalsIgnoreCase(metricName)) {
+ action = HoodieActiveTimeline.REPLACE_COMMIT_ACTION;
+ }
Review Comment:
Could we use switch style?
Also no need to take care of tableType, bcz MOR table also can do clustering.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]