This is an automated email from the ASF dual-hosted git repository.

jinsongzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git


The following commit(s) were added to refs/heads/master by this push:
     new 9b05fbf11 [AMORO-2993] Enrich optimizing metrics (#2994)
9b05fbf11 is described below

commit 9b05fbf11e37ad69b050180e8b26064466586807
Author: Xavier Bai <[email protected]>
AuthorDate: Mon Aug 19 14:07:47 2024 +0800

    [AMORO-2993] Enrich optimizing metrics (#2994)
    
    * Rebase orphan metrics
    
    * rebase master
    
    * add doc
    
    * rename
    
    * adjust
    
    * fix some bugs
---
 .../amoro/server/table/TableOptimizingMetrics.java | 156 ++++++++++++++++++++-
 .../apache/amoro/server/table/TableRuntime.java    |  44 ++++--
 .../amoro/server/utils/IcebergTableUtil.java       |  10 ++
 .../amoro-bin/conf/plugins/metric-reporters.yaml   |   2 +-
 docs/user-guides/metrics.md                        |  45 +++---
 5 files changed, 227 insertions(+), 30 deletions(-)

diff --git 
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/TableOptimizingMetrics.java
 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/TableOptimizingMetrics.java
index d97a64e2e..13a2f2119 100644
--- 
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/TableOptimizingMetrics.java
+++ 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/TableOptimizingMetrics.java
@@ -27,11 +27,16 @@ import org.apache.amoro.api.metrics.Gauge;
 import org.apache.amoro.api.metrics.Metric;
 import org.apache.amoro.api.metrics.MetricDefine;
 import org.apache.amoro.api.metrics.MetricKey;
+import org.apache.amoro.server.AmoroServiceConstants;
 import org.apache.amoro.server.metrics.MetricRegistry;
 import org.apache.amoro.server.optimizing.OptimizingStatus;
 import org.apache.amoro.server.optimizing.OptimizingType;
+import org.apache.amoro.server.optimizing.maintainer.IcebergTableMaintainer;
 import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableMap;
 import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
+import org.apache.amoro.shade.guava32.com.google.common.primitives.Longs;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
 
 import java.util.List;
 
@@ -163,6 +168,37 @@ public class TableOptimizingMetrics {
           .withTags("catalog", "database", "table")
           .build();
 
+  public static final MetricDefine 
TABLE_OPTIMIZING_SINCE_LAST_MINOR_OPTIMIZATION =
+      defineGauge("table_optimizing_since_last_minor_optimization_mills")
+          .withDescription("Duration in milliseconds since last successful 
minor optimization")
+          .withTags("catalog", "database", "table")
+          .build();
+
+  public static final MetricDefine 
TABLE_OPTIMIZING_SINCE_LAST_MAJOR_OPTIMIZATION =
+      defineGauge("table_optimizing_since_last_major_optimization_mills")
+          .withDescription("Duration in milliseconds since last successful 
major optimization")
+          .withTags("catalog", "database", "table")
+          .build();
+
+  public static final MetricDefine 
TABLE_OPTIMIZING_SINCE_LAST_FULL_OPTIMIZATION =
+      defineGauge("table_optimizing_since_last_full_optimization_mills")
+          .withDescription("Duration in milliseconds since last successful 
full optimization")
+          .withTags("catalog", "database", "table")
+          .build();
+
+  public static final MetricDefine TABLE_OPTIMIZING_SINCE_LAST_OPTIMIZATION =
+      defineGauge("table_optimizing_since_last_optimization_mills")
+          .withDescription("Duration in milliseconds since last successful 
optimization")
+          .withTags("catalog", "database", "table")
+          .build();
+
+  public static final MetricDefine TABLE_OPTIMIZING_LAG_DURATION =
+      defineGauge("table_optimizing_lag_duration_mills")
+          .withDescription(
+              "Duration in milliseconds between last self-optimizing snapshot 
and refreshed snapshot")
+          .withTags("catalog", "database", "table")
+          .build();
+
   private final Counter processTotalCount = new Counter();
   private final Counter processFailedCount = new Counter();
   private final Counter minorTotalCount = new Counter();
@@ -176,6 +212,9 @@ public class TableOptimizingMetrics {
 
   private OptimizingStatus optimizingStatus = OptimizingStatus.IDLE;
   private long statusSetTimestamp = System.currentTimeMillis();
+  private long lastMinorTime, lastMajorTime, lastFullTime;
+  private long lastNonMaintainedTime = AmoroServiceConstants.INVALID_TIME;
+  private long lastOptimizingTime = AmoroServiceConstants.INVALID_TIME;
   private final List<MetricKey> registeredMetricKeys = Lists.newArrayList();
   private MetricRegistry globalRegistry;
 
@@ -241,6 +280,23 @@ public class TableOptimizingMetrics {
       registerMetric(registry, TABLE_OPTIMIZING_FULL_TOTAL_COUNT, 
fullTotalCount);
       registerMetric(registry, TABLE_OPTIMIZING_FULL_FAILED_COUNT, 
fullFailedCount);
 
+      // register last optimizing duration metrics
+      registerMetric(
+          registry,
+          TABLE_OPTIMIZING_SINCE_LAST_MINOR_OPTIMIZATION,
+          new LastOptimizingDurationGauge(OptimizingType.MINOR));
+      registerMetric(
+          registry,
+          TABLE_OPTIMIZING_SINCE_LAST_MAJOR_OPTIMIZATION,
+          new LastOptimizingDurationGauge(OptimizingType.MAJOR));
+      registerMetric(
+          registry,
+          TABLE_OPTIMIZING_SINCE_LAST_FULL_OPTIMIZATION,
+          new LastOptimizingDurationGauge(OptimizingType.FULL));
+      registerMetric(
+          registry, TABLE_OPTIMIZING_SINCE_LAST_OPTIMIZATION, new 
LastOptimizingDurationGauge());
+      registerMetric(registry, TABLE_OPTIMIZING_LAG_DURATION, new 
OptimizingLagDurationGauge());
+
       globalRegistry = registry;
     }
   }
@@ -262,13 +318,60 @@ public class TableOptimizingMetrics {
     this.statusSetTimestamp = statusSetTimestamp;
   }
 
+  /**
+   * Handle table self optimizing process complete event.
+   *
+   * @param processType optimizing process type.
+   * @param lastTime last optimizing timestamp.
+   */
+  public void lastOptimizingTime(OptimizingType processType, long lastTime) {
+    switch (processType) {
+      case MINOR:
+        this.lastMinorTime = lastTime;
+        break;
+      case MAJOR:
+        this.lastMajorTime = lastTime;
+        break;
+      case FULL:
+        this.lastFullTime = lastTime;
+        break;
+    }
+  }
+
+  public void nonMaintainedSnapshotTime(Snapshot snapshot) {
+    if (snapshot == null) {
+      return;
+    }
+    // ignore snapshot which is created by amoro maintain commits or no files 
added
+    if (snapshot.summary().values().stream()
+            .anyMatch(IcebergTableMaintainer.AMORO_MAINTAIN_COMMITS::contains)
+        || 
Long.parseLong(snapshot.summary().getOrDefault(SnapshotSummary.ADDED_FILES_PROP,
 "0"))
+            == 0) {
+      return;
+    }
+
+    this.lastNonMaintainedTime = Longs.max(lastNonMaintainedTime, 
snapshot.timestampMillis());
+  }
+
+  public void lastOptimizingSnapshotTime(Snapshot snapshot) {
+    if (snapshot == null) {
+      return;
+    }
+
+    this.lastOptimizingTime =
+        Longs.max(
+            lastOptimizingTime,
+            snapshot.timestampMillis(),
+            Longs.max(lastMinorTime, lastMajorTime, lastFullTime));
+  }
+
   /**
    * Handle table self optimizing process completed event.
    *
    * @param processType optimizing process type.
    * @param success is optimizing process success.
    */
-  public void processComplete(OptimizingType processType, boolean success) {
+  public void processComplete(OptimizingType processType, boolean success, 
long planTime) {
     processTotalCount.inc();
     Counter totalCounter = null;
     Counter failedCounter = null;
@@ -286,6 +389,9 @@ public class TableOptimizingMetrics {
         failedCounter = fullFailedCount;
         break;
     }
+    if (success) {
+      lastOptimizingTime(processType, planTime);
+    }
     if (totalCounter != null) {
       totalCounter.inc();
     }
@@ -334,6 +440,54 @@ public class TableOptimizingMetrics {
     }
   }
 
+  class LastOptimizingDurationGauge implements Gauge<Long> {
+    final OptimizingType optimizingType;
+
+    LastOptimizingDurationGauge(OptimizingType optimizingType) {
+      this.optimizingType = optimizingType;
+    }
+
+    LastOptimizingDurationGauge() {
+      optimizingType = null;
+    }
+
+    @Override
+    public Long getValue() {
+      if (optimizingType == null) {
+        return optimizingInterval(lastOptimizingTime);
+      }
+
+      switch (optimizingType) {
+        case MINOR:
+          return optimizingInterval(lastMinorTime);
+        case MAJOR:
+          return optimizingInterval(lastMajorTime);
+        case FULL:
+          return optimizingInterval(lastFullTime);
+        default:
+          return AmoroServiceConstants.INVALID_TIME;
+      }
+    }
+
+    private long optimizingInterval(long lastOptimizedTime) {
+      return lastOptimizedTime > 0
+          ? System.currentTimeMillis() - lastOptimizedTime
+          : AmoroServiceConstants.INVALID_TIME;
+    }
+  }
+
+  class OptimizingLagDurationGauge implements Gauge<Long> {
+    @Override
+    public Long getValue() {
+      if (lastNonMaintainedTime == AmoroServiceConstants.INVALID_TIME
+          || lastOptimizingTime == AmoroServiceConstants.INVALID_TIME) {
+        return AmoroServiceConstants.INVALID_TIME;
+      } else {
+        return lastNonMaintainedTime - lastOptimizingTime;
+      }
+    }
+  }
+
   class IsInStatusGauge implements Gauge<Long> {
     final String targetStatus;
 
diff --git 
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/TableRuntime.java
 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/TableRuntime.java
index 9cf17b4fc..ae9ce9b93 100644
--- 
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/TableRuntime.java
+++ 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/TableRuntime.java
@@ -40,7 +40,10 @@ import org.apache.amoro.server.table.blocker.TableBlocker;
 import org.apache.amoro.server.utils.IcebergTableUtil;
 import org.apache.amoro.shade.guava32.com.google.common.base.MoreObjects;
 import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
+import org.apache.amoro.table.BaseTable;
+import org.apache.amoro.table.ChangeTable;
 import org.apache.amoro.table.MixedTable;
+import org.apache.amoro.table.UnkeyedTable;
 import org.apache.iceberg.Snapshot;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -133,6 +136,9 @@ public class TableRuntime extends StatedPersistentBase {
     this.pendingInput = tableRuntimeMeta.getPendingInput();
     optimizingMetrics = new TableOptimizingMetrics(tableIdentifier);
     optimizingMetrics.statusChanged(optimizingStatus, 
this.currentStatusStartTime);
+    optimizingMetrics.lastOptimizingTime(OptimizingType.MINOR, 
this.lastMinorOptimizingTime);
+    optimizingMetrics.lastOptimizingTime(OptimizingType.MAJOR, 
this.lastMajorOptimizingTime);
+    optimizingMetrics.lastOptimizingTime(OptimizingType.FULL, 
this.lastFullOptimizingTime);
     orphanFilesCleaningMetrics = new 
TableOrphanFilesCleaningMetrics(tableIdentifier);
   }
 
@@ -300,10 +306,10 @@ public class TableRuntime extends StatedPersistentBase {
               lastFullOptimizingTime = optimizingProcess.getPlanTime();
             }
           }
+          optimizingMetrics.processComplete(processType, success, 
optimizingProcess.getPlanTime());
           updateOptimizingStatus(OptimizingStatus.IDLE);
           optimizingProcess = null;
           persistUpdatingRuntime();
-          optimizingMetrics.processComplete(processType, success);
           tableHandler.handleTableChanged(this, originalStatus);
         });
   }
@@ -316,12 +322,16 @@ public class TableRuntime extends StatedPersistentBase {
 
   private boolean refreshSnapshots(AmoroTable<?> amoroTable) {
     MixedTable table = (MixedTable) amoroTable.originalTable();
+
+    long lastSnapshotId = currentSnapshotId;
     if (table.isKeyedTable()) {
-      long lastSnapshotId = currentSnapshotId;
       long changeSnapshotId = currentChangeSnapshotId;
-      currentSnapshotId = 
IcebergTableUtil.getSnapshotId(table.asKeyedTable().baseTable(), false);
-      currentChangeSnapshotId =
-          IcebergTableUtil.getSnapshotId(table.asKeyedTable().changeTable(), 
false);
+      ChangeTable changeTable = table.asKeyedTable().changeTable();
+      BaseTable baseTable = table.asKeyedTable().baseTable();
+
+      currentChangeSnapshotId = doRefreshSnapshots(changeTable);
+      currentSnapshotId = doRefreshSnapshots(baseTable);
+
       if (currentSnapshotId != lastSnapshotId || currentChangeSnapshotId != 
changeSnapshotId) {
         LOG.info(
             "Refreshing table {} with base snapshot id {} and change snapshot 
id {}",
@@ -331,9 +341,7 @@ public class TableRuntime extends StatedPersistentBase {
         return true;
       }
     } else {
-      long lastSnapshotId = currentSnapshotId;
-      Snapshot currentSnapshot = table.asUnkeyedTable().currentSnapshot();
-      currentSnapshotId = currentSnapshot == null ? -1 : 
currentSnapshot.snapshotId();
+      currentSnapshotId = doRefreshSnapshots((UnkeyedTable) table);
       if (currentSnapshotId != lastSnapshotId) {
         LOG.info(
             "Refreshing table {} with base snapshot id {}", tableIdentifier, 
currentSnapshotId);
@@ -343,6 +351,26 @@ public class TableRuntime extends StatedPersistentBase {
     return false;
   }
 
+  /**
+   * Refresh snapshots for table.
+   *
+   * @param table - table
+   * @return refreshed snapshotId
+   */
+  private long doRefreshSnapshots(UnkeyedTable table) {
+    long currentSnapshotId = AmoroServiceConstants.INVALID_SNAPSHOT_ID;
+    Snapshot currentSnapshot = IcebergTableUtil.getSnapshot(table, false);
+    if (currentSnapshot != null) {
+      currentSnapshotId = currentSnapshot.snapshotId();
+    }
+
+    optimizingMetrics.nonMaintainedSnapshotTime(currentSnapshot);
+    optimizingMetrics.lastOptimizingSnapshotTime(
+        IcebergTableUtil.findLatestOptimizingSnapshot(table).orElse(null));
+
+    return currentSnapshotId;
+  }
+
   public OptimizingEvaluator.PendingInput getPendingInput() {
     return pendingInput;
   }
diff --git 
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/utils/IcebergTableUtil.java
 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/utils/IcebergTableUtil.java
index 911c97ad9..2a78dc34b 100644
--- 
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/utils/IcebergTableUtil.java
+++ 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/utils/IcebergTableUtil.java
@@ -19,6 +19,7 @@
 package org.apache.amoro.server.utils;
 
 import org.apache.amoro.IcebergFileEntry;
+import org.apache.amoro.api.CommitMetaProducer;
 import org.apache.amoro.scan.TableEntriesScan;
 import org.apache.amoro.server.AmoroServiceConstants;
 import org.apache.amoro.server.table.BasicTableSnapshot;
@@ -33,6 +34,7 @@ import 
org.apache.amoro.shade.guava32.com.google.common.collect.Sets;
 import org.apache.amoro.table.MixedTable;
 import org.apache.amoro.utils.TableFileUtil;
 import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.DataOperations;
 import org.apache.iceberg.DeleteFile;
 import org.apache.iceberg.FileContent;
 import org.apache.iceberg.FileScanTask;
@@ -95,6 +97,14 @@ public class IcebergTableUtil {
     return Optional.ofNullable(Iterables.tryFind(snapshots, 
predicate).orNull());
   }
 
+  public static Optional<Snapshot> findLatestOptimizingSnapshot(Table table) {
+    return IcebergTableUtil.findFirstMatchSnapshot(
+        table,
+        snapshot ->
+            
snapshot.summary().containsValue(CommitMetaProducer.OPTIMIZE.name())
+                && DataOperations.REPLACE.equals(snapshot.operation()));
+  }
+
   public static Set<String> getAllContentFilePath(Table internalTable) {
     Set<String> validFilesPath = new HashSet<>();
 
diff --git 
a/amoro-ams/dist/src/main/amoro-bin/conf/plugins/metric-reporters.yaml 
b/amoro-ams/dist/src/main/amoro-bin/conf/plugins/metric-reporters.yaml
index a68a772b2..1a9639f6a 100644
--- a/amoro-ams/dist/src/main/amoro-bin/conf/plugins/metric-reporters.yaml
+++ b/amoro-ams/dist/src/main/amoro-bin/conf/plugins/metric-reporters.yaml
@@ -20,6 +20,6 @@
 
 #metric-reporters:
 #  - name: prometheus-exporter            # configs for prometheus exporter
-#    enabled: false
+#    enabled: true
 #    properties:
 #       port: 7001
\ No newline at end of file
diff --git a/docs/user-guides/metrics.md b/docs/user-guides/metrics.md
index 83a4010a4..c84d565d8 100644
--- a/docs/user-guides/metrics.md
+++ b/docs/user-guides/metrics.md
@@ -37,26 +37,31 @@ Amoro has supported built-in metrics to measure status of 
table self-optimizing
 
 ## Self-optimizing metrics
 
-| Metric Name                                       | Type    | Tags           
          | Description                                                  |
-|---------------------------------------------------|---------|--------------------------|--------------------------------------------------------------|
-| table_optimizing_status_idle_duration_mills       | Gauge   | catalog, 
database, table | Duration in milliseconds after table be in idle status       |
-| table_optimizing_status_pending_duration_mills    | Gauge   | catalog, 
database, table | Duration in milliseconds after table be in pending status    |
-| table_optimizing_status_planning_duration_mills   | Gauge   | catalog, 
database, table | Duration in milliseconds after table be in planning status   |
-| table_optimizing_status_executing_duration_mills  | Gauge   | catalog, 
database, table | Duration in milliseconds after table be in executing status  |
-| table_optimizing_status_committing_duration_mills | Gauge   | catalog, 
database, table | Duration in milliseconds after table be in committing status |
-| table_optimizing_process_total_count              | Counter | catalog, 
database, table | Count of all optimizing process since ams started            |
-| table_optimizing_process_failed_count             | Counter | catalog, 
database, table | Count of failed optimizing process since ams started         |
-| table_optimizing_minor_total_count                | Counter | catalog, 
database, table | Count of minor optimizing process since ams started          |
-| table_optimizing_minor_failed_count               | Counter | catalog, 
database, table | Count of failed minor optimizing process since ams started   |
-| table_optimizing_major_total_count                | Counter | catalog, 
database, table | Count of major optimizing process since ams started          |
-| table_optimizing_major_failed_count               | Counter | catalog, 
database, table | Count of failed major optimizing process since ams started   |
-| table_optimizing_full_total_count                 | Counter | catalog, 
database, table | Count of full optimizing rocess since ams started            |
-| table_optimizing_full_failed_count                | Counter | catalog, 
database, table | Count of failed full optimizing process since ams started    |
-| table_optimizing_status_in_idle                   | Gauge   | catalog, 
database, table | If currently table is in idle status                         |
-| table_optimizing_status_in_pending                | Gauge   | catalog, 
database, table | If currently table is in pending status                      |
-| table_optimizing_status_in_planning               | Gauge   | catalog, 
database, table | If currently table is in planning status                     |
-| table_optimizing_status_in_executing              | Gauge   | catalog, 
database, table | If currently table is in executing status                    |
-| table_optimizing_status_in_committing             | Gauge   | catalog, 
database, table | If currently table is in committing status                   |
+| Metric Name                                          | Type    | Tags        
             | Description                                                      
                      |
+|------------------------------------------------------|---------|--------------------------|----------------------------------------------------------------------------------------|
+| table_optimizing_status_idle_duration_mills          | Gauge   | catalog, 
database, table | Duration in milliseconds after table be in idle status        
                         |
+| table_optimizing_status_pending_duration_mills       | Gauge   | catalog, 
database, table | Duration in milliseconds after table be in pending status     
                         |
+| table_optimizing_status_planning_duration_mills      | Gauge   | catalog, 
database, table | Duration in milliseconds after table be in planning status    
                         |
+| table_optimizing_status_executing_duration_mills     | Gauge   | catalog, 
database, table | Duration in milliseconds after table be in executing status   
                         |
+| table_optimizing_status_committing_duration_mills    | Gauge   | catalog, 
database, table | Duration in milliseconds after table be in committing status  
                         |
+| table_optimizing_process_total_count                 | Counter | catalog, 
database, table | Count of all optimizing process since ams started             
                         |
+| table_optimizing_process_failed_count                | Counter | catalog, 
database, table | Count of failed optimizing process since ams started          
                         |
+| table_optimizing_minor_total_count                   | Counter | catalog, 
database, table | Count of minor optimizing process since ams started           
                         |
+| table_optimizing_minor_failed_count                  | Counter | catalog, 
database, table | Count of failed minor optimizing process since ams started    
                         |
+| table_optimizing_major_total_count                   | Counter | catalog, 
database, table | Count of major optimizing process since ams started           
                         |
+| table_optimizing_major_failed_count                  | Counter | catalog, 
database, table | Count of failed major optimizing process since ams started    
                         |
+| table_optimizing_full_total_count                    | Counter | catalog, 
database, table | Count of full optimizing process since ams started            
                         |
+| table_optimizing_full_failed_count                   | Counter | catalog, 
database, table | Count of failed full optimizing process since ams started     
                         |
+| table_optimizing_status_in_idle                      | Gauge   | catalog, 
database, table | If currently table is in idle status                          
                         |
+| table_optimizing_status_in_pending                   | Gauge   | catalog, 
database, table | If currently table is in pending status                       
                         |
+| table_optimizing_status_in_planning                  | Gauge   | catalog, 
database, table | If currently table is in planning status                      
                         |
+| table_optimizing_status_in_executing                 | Gauge   | catalog, 
database, table | If currently table is in executing status                     
                         |
+| table_optimizing_status_in_committing                | Gauge   | catalog, 
database, table | If currently table is in committing status                    
                         |
+| table_optimizing_since_last_minor_optimization_mills | Gauge   | catalog, 
database, table | Duration in milliseconds since last successful minor 
optimization                      |
+| table_optimizing_since_last_major_optimization_mills | Gauge   | catalog, 
database, table | Duration in milliseconds since last successful major 
optimization                      |
+| table_optimizing_since_last_full_optimization_mills  | Gauge   | catalog, 
database, table | Duration in milliseconds since last successful full 
optimization                       |
+| table_optimizing_since_last_optimization_mills       | Gauge   | catalog, 
database, table | Duration in milliseconds since last successful optimization   
                         |
+| table_optimizing_lag_duration_mills                  | Gauge   | catalog, 
database, table | Duration in milliseconds between last self-optimizing 
snapshot and refreshed snapshot  |
 
 ## Optimizer Group metrics
 

Reply via email to