zhangyue19921010 commented on code in PR #12681:
URL: https://github.com/apache/hudi/pull/12681#discussion_r1927999273


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java:
##########
@@ -565,7 +565,7 @@ protected void postCommit(HoodieTable table, 
HoodieCommitMetadata metadata, Stri
       // Delete the marker directory for the instant.
       WriteMarkersFactory.get(config.getMarkersType(), table, instantTime)
           .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
-      
metrics.updateClusteringTimeLineInstantMetrics(table.getActiveTimeline());
+      metrics.updateTimelineInstantMetrics(table.getActiveTimeline());

Review Comment:
   could we rename this function to `updateTableServiceInstantMetrics`



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java:
##########
@@ -383,27 +396,67 @@ public void updateClusteringFileCreationMetrics(long 
durationInMs) {
     reportMetrics(HoodieTimeline.CLUSTERING_ACTION, "fileCreationTime", 
durationInMs);
   }
 
-  public void updateClusteringTimeLineInstantMetrics(final 
HoodieActiveTimeline activeTimeline) {
-    if (config.isMetricsOn()) {
-      // Compute Metrics
-      long pendingClusteringInstantCount = 
activeTimeline.filterPendingClusteringTimeline().getInstants().size();
-      long earliestInflightClusteringInstantLong = 0L;
-      Option<HoodieInstant> earliestInflightClusteringInstant = 
activeTimeline.filterPendingClusteringTimeline().getFirstPendingClusterInstant();
-      if (earliestInflightClusteringInstant.isPresent()) {
-        earliestInflightClusteringInstantLong = 
Long.valueOf(earliestInflightClusteringInstant.get().requestedTime());
-      }
-      long latestCompletedClusteringInstantLong = 0L;
-      Option<HoodieInstant> latestCompletedClusteringInstant = 
activeTimeline.filterCompletedInstants().getLastClusteringInstant();
-      if (latestCompletedClusteringInstant.isPresent()) {
-        latestCompletedClusteringInstantLong = 
Long.valueOf(latestCompletedClusteringInstant.get().requestedTime());
-      }
+  public void updateTimelineInstantMetrics(final HoodieActiveTimeline 
activeTimeline) {
+    updateEarliestPendingInstant(activeTimeline, 
EARLIEST_PENDING_CLUSTERING_INSTANT_STR, HoodieTimeline.CLUSTERING_ACTION);
+    updateEarliestPendingInstant(activeTimeline, 
EARLIEST_PENDING_COMPACTION_INSTANT_STR, HoodieTimeline.COMPACTION_ACTION);
+    updateEarliestPendingInstant(activeTimeline, 
EARLIEST_PENDING_CLEAN_INSTANT_STR, HoodieTimeline.CLEAN_ACTION);
+    updateEarliestPendingInstant(activeTimeline, 
EARLIEST_PENDING_ROLLBACK_INSTANT_STR, HoodieTimeline.ROLLBACK_ACTION);
+
+    updateLatestCompletedInstant(activeTimeline, 
LATEST_COMPLETED_CLUSTERING_INSTANT_STR, HoodieTimeline.REPLACE_COMMIT_ACTION);
+    updateLatestCompletedInstant(activeTimeline, 
LATEST_COMPLETED_COMPACTION_INSTANT_STR, HoodieTimeline.COMMIT_ACTION);
+    updateLatestCompletedInstant(activeTimeline, 
LATEST_COMPLETED_CLEAN_INSTANT_STR, HoodieTimeline.CLEAN_ACTION);
+    updateLatestCompletedInstant(activeTimeline, 
LATEST_COMPLETED_ROLLBACK_INSTANT_STR, HoodieTimeline.ROLLBACK_ACTION);
+
+    updatePendingInstantCount(activeTimeline, 
PENDING_CLUSTERING_INSTANT_COUNT_STR, HoodieTimeline.CLUSTERING_ACTION);
+    updatePendingInstantCount(activeTimeline, 
PENDING_COMPACTION_INSTANT_COUNT_STR, HoodieTimeline.COMPACTION_ACTION);
+    updatePendingInstantCount(activeTimeline, PENDING_CLEAN_INSTANT_COUNT_STR, 
HoodieTimeline.CLEAN_ACTION);
+    updatePendingInstantCount(activeTimeline, 
PENDING_ROLLBACK_INSTANT_COUNT_STR, HoodieTimeline.ROLLBACK_ACTION);
+  }
+
+  private void updateEarliestPendingInstant(final HoodieActiveTimeline 
activeTimeline,
+                                            final String metricName,
+                                            final String action) {
+    Set<String> validActions = CollectionUtils.createSet(action);
+    HoodieTimeline filteredInstants = 
activeTimeline.filterInflightsAndRequested().filter(instant -> 
validActions.contains(instant.getAction()));
+    Option<HoodieInstant> hoodieInstantOption = 
filteredInstants.firstInstant();
+    if (hoodieInstantOption.isPresent()) {
+      updateMetric(action, metricName, 
Long.parseLong(hoodieInstantOption.get().requestedTime()));
+    }
+  }
 
+  private void updateLatestCompletedInstant(final HoodieActiveTimeline 
activeTimeline,
+                                            final String metricName,
+                                            String action) {
+    String tableType = config.getTableType().name();
+    if (HoodieTableType.MERGE_ON_READ.name().equalsIgnoreCase(tableType)
+        && 
LATEST_COMPLETED_COMPACTION_INSTANT_STR.equalsIgnoreCase(metricName)) {
+      action = HoodieActiveTimeline.COMMIT_ACTION;
+    }
+    if (HoodieTableType.COPY_ON_WRITE.name().equalsIgnoreCase(tableType)
+        && 
LATEST_COMPLETED_CLUSTERING_INSTANT_STR.equalsIgnoreCase(metricName)) {
+      action = HoodieActiveTimeline.REPLACE_COMMIT_ACTION;
+    }
+    Set<String> validActions = CollectionUtils.createSet(action);
+    HoodieTimeline filteredInstants = 
activeTimeline.filterCompletedInstants().filter(instant -> 
validActions.contains(instant.getAction()));
+    Option<HoodieInstant> hoodieInstantOption = filteredInstants.lastInstant();
+    if (hoodieInstantOption.isPresent()) {
+      updateMetric(action, metricName, 
Long.parseLong(hoodieInstantOption.get().requestedTime()));
+    }
+  }
+
+  private void updatePendingInstantCount(final HoodieActiveTimeline 
activeTimeline,

Review Comment:
   could we add some java docs? what does `PendingInstantCount` mean? and what 
is this metric used for?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java:
##########
@@ -383,27 +396,67 @@ public void updateClusteringFileCreationMetrics(long 
durationInMs) {
     reportMetrics(HoodieTimeline.CLUSTERING_ACTION, "fileCreationTime", 
durationInMs);
   }
 
-  public void updateClusteringTimeLineInstantMetrics(final 
HoodieActiveTimeline activeTimeline) {
-    if (config.isMetricsOn()) {
-      // Compute Metrics
-      long pendingClusteringInstantCount = 
activeTimeline.filterPendingClusteringTimeline().getInstants().size();
-      long earliestInflightClusteringInstantLong = 0L;
-      Option<HoodieInstant> earliestInflightClusteringInstant = 
activeTimeline.filterPendingClusteringTimeline().getFirstPendingClusterInstant();
-      if (earliestInflightClusteringInstant.isPresent()) {
-        earliestInflightClusteringInstantLong = 
Long.valueOf(earliestInflightClusteringInstant.get().requestedTime());
-      }
-      long latestCompletedClusteringInstantLong = 0L;
-      Option<HoodieInstant> latestCompletedClusteringInstant = 
activeTimeline.filterCompletedInstants().getLastClusteringInstant();
-      if (latestCompletedClusteringInstant.isPresent()) {
-        latestCompletedClusteringInstantLong = 
Long.valueOf(latestCompletedClusteringInstant.get().requestedTime());
-      }
+  public void updateTimelineInstantMetrics(final HoodieActiveTimeline 
activeTimeline) {
+    updateEarliestPendingInstant(activeTimeline, 
EARLIEST_PENDING_CLUSTERING_INSTANT_STR, HoodieTimeline.CLUSTERING_ACTION);
+    updateEarliestPendingInstant(activeTimeline, 
EARLIEST_PENDING_COMPACTION_INSTANT_STR, HoodieTimeline.COMPACTION_ACTION);
+    updateEarliestPendingInstant(activeTimeline, 
EARLIEST_PENDING_CLEAN_INSTANT_STR, HoodieTimeline.CLEAN_ACTION);
+    updateEarliestPendingInstant(activeTimeline, 
EARLIEST_PENDING_ROLLBACK_INSTANT_STR, HoodieTimeline.ROLLBACK_ACTION);
+
+    updateLatestCompletedInstant(activeTimeline, 
LATEST_COMPLETED_CLUSTERING_INSTANT_STR, HoodieTimeline.REPLACE_COMMIT_ACTION);
+    updateLatestCompletedInstant(activeTimeline, 
LATEST_COMPLETED_COMPACTION_INSTANT_STR, HoodieTimeline.COMMIT_ACTION);
+    updateLatestCompletedInstant(activeTimeline, 
LATEST_COMPLETED_CLEAN_INSTANT_STR, HoodieTimeline.CLEAN_ACTION);
+    updateLatestCompletedInstant(activeTimeline, 
LATEST_COMPLETED_ROLLBACK_INSTANT_STR, HoodieTimeline.ROLLBACK_ACTION);
+
+    updatePendingInstantCount(activeTimeline, 
PENDING_CLUSTERING_INSTANT_COUNT_STR, HoodieTimeline.CLUSTERING_ACTION);
+    updatePendingInstantCount(activeTimeline, 
PENDING_COMPACTION_INSTANT_COUNT_STR, HoodieTimeline.COMPACTION_ACTION);
+    updatePendingInstantCount(activeTimeline, PENDING_CLEAN_INSTANT_COUNT_STR, 
HoodieTimeline.CLEAN_ACTION);
+    updatePendingInstantCount(activeTimeline, 
PENDING_ROLLBACK_INSTANT_COUNT_STR, HoodieTimeline.ROLLBACK_ACTION);
+  }
+
+  private void updateEarliestPendingInstant(final HoodieActiveTimeline 
activeTimeline,
+                                            final String metricName,
+                                            final String action) {
+    Set<String> validActions = CollectionUtils.createSet(action);
+    HoodieTimeline filteredInstants = 
activeTimeline.filterInflightsAndRequested().filter(instant -> 
validActions.contains(instant.getAction()));
+    Option<HoodieInstant> hoodieInstantOption = 
filteredInstants.firstInstant();
+    if (hoodieInstantOption.isPresent()) {
+      updateMetric(action, metricName, 
Long.parseLong(hoodieInstantOption.get().requestedTime()));
+    }
+  }
 
+  private void updateLatestCompletedInstant(final HoodieActiveTimeline 
activeTimeline,
+                                            final String metricName,
+                                            String action) {
+    String tableType = config.getTableType().name();
+    if (HoodieTableType.MERGE_ON_READ.name().equalsIgnoreCase(tableType)
+        && 
LATEST_COMPLETED_COMPACTION_INSTANT_STR.equalsIgnoreCase(metricName)) {
+      action = HoodieActiveTimeline.COMMIT_ACTION;
+    }
+    if (HoodieTableType.COPY_ON_WRITE.name().equalsIgnoreCase(tableType)
+        && 
LATEST_COMPLETED_CLUSTERING_INSTANT_STR.equalsIgnoreCase(metricName)) {
+      action = HoodieActiveTimeline.REPLACE_COMMIT_ACTION;
+    }

Review Comment:
   Could we use switch style?
   Also no need to take care of tableType, bcz MOR table also can do clustering.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to