This is an automated email from the ASF dual-hosted git repository.
jinsongzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git
The following commit(s) were added to refs/heads/master by this push:
new 9b05fbf11 [AMORO-2993] Enrich optimizing metrics (#2994)
9b05fbf11 is described below
commit 9b05fbf11e37ad69b050180e8b26064466586807
Author: Xavier Bai <[email protected]>
AuthorDate: Mon Aug 19 14:07:47 2024 +0800
[AMORO-2993] Enrich optimizing metrics (#2994)
* Rebase orphan metrics
* rebase master
* add doc
* rename
* adjust
* fix some bugs
---
.../amoro/server/table/TableOptimizingMetrics.java | 156 ++++++++++++++++++++-
.../apache/amoro/server/table/TableRuntime.java | 44 ++++--
.../amoro/server/utils/IcebergTableUtil.java | 10 ++
.../amoro-bin/conf/plugins/metric-reporters.yaml | 2 +-
docs/user-guides/metrics.md | 45 +++---
5 files changed, 227 insertions(+), 30 deletions(-)
diff --git
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/TableOptimizingMetrics.java
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/TableOptimizingMetrics.java
index d97a64e2e..13a2f2119 100644
---
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/TableOptimizingMetrics.java
+++
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/TableOptimizingMetrics.java
@@ -27,11 +27,16 @@ import org.apache.amoro.api.metrics.Gauge;
import org.apache.amoro.api.metrics.Metric;
import org.apache.amoro.api.metrics.MetricDefine;
import org.apache.amoro.api.metrics.MetricKey;
+import org.apache.amoro.server.AmoroServiceConstants;
import org.apache.amoro.server.metrics.MetricRegistry;
import org.apache.amoro.server.optimizing.OptimizingStatus;
import org.apache.amoro.server.optimizing.OptimizingType;
+import org.apache.amoro.server.optimizing.maintainer.IcebergTableMaintainer;
import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableMap;
import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
+import org.apache.amoro.shade.guava32.com.google.common.primitives.Longs;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
import java.util.List;
@@ -163,6 +168,37 @@ public class TableOptimizingMetrics {
.withTags("catalog", "database", "table")
.build();
+ public static final MetricDefine
TABLE_OPTIMIZING_SINCE_LAST_MINOR_OPTIMIZATION =
+ defineGauge("table_optimizing_since_last_minor_optimization_mills")
+ .withDescription("Duration in milliseconds since last successful
minor optimization")
+ .withTags("catalog", "database", "table")
+ .build();
+
+ public static final MetricDefine
TABLE_OPTIMIZING_SINCE_LAST_MAJOR_OPTIMIZATION =
+ defineGauge("table_optimizing_since_last_major_optimization_mills")
+ .withDescription("Duration in milliseconds since last successful
major optimization")
+ .withTags("catalog", "database", "table")
+ .build();
+
+ public static final MetricDefine
TABLE_OPTIMIZING_SINCE_LAST_FULL_OPTIMIZATION =
+ defineGauge("table_optimizing_since_last_full_optimization_mills")
+ .withDescription("Duration in milliseconds since last successful
full optimization")
+ .withTags("catalog", "database", "table")
+ .build();
+
+ public static final MetricDefine TABLE_OPTIMIZING_SINCE_LAST_OPTIMIZATION =
+ defineGauge("table_optimizing_since_last_optimization_mills")
+ .withDescription("Duration in milliseconds since last successful
optimization")
+ .withTags("catalog", "database", "table")
+ .build();
+
+ public static final MetricDefine TABLE_OPTIMIZING_LAG_DURATION =
+ defineGauge("table_optimizing_lag_duration_mills")
+ .withDescription(
+ "Duration in milliseconds between last self-optimizing snapshot
and refreshed snapshot")
+ .withTags("catalog", "database", "table")
+ .build();
+
private final Counter processTotalCount = new Counter();
private final Counter processFailedCount = new Counter();
private final Counter minorTotalCount = new Counter();
@@ -176,6 +212,9 @@ public class TableOptimizingMetrics {
private OptimizingStatus optimizingStatus = OptimizingStatus.IDLE;
private long statusSetTimestamp = System.currentTimeMillis();
+ private long lastMinorTime, lastMajorTime, lastFullTime;
+ private long lastNonMaintainedTime = AmoroServiceConstants.INVALID_TIME;
+ private long lastOptimizingTime = AmoroServiceConstants.INVALID_TIME;
private final List<MetricKey> registeredMetricKeys = Lists.newArrayList();
private MetricRegistry globalRegistry;
@@ -241,6 +280,23 @@ public class TableOptimizingMetrics {
registerMetric(registry, TABLE_OPTIMIZING_FULL_TOTAL_COUNT,
fullTotalCount);
registerMetric(registry, TABLE_OPTIMIZING_FULL_FAILED_COUNT,
fullFailedCount);
+ // register last optimizing duration metrics
+ registerMetric(
+ registry,
+ TABLE_OPTIMIZING_SINCE_LAST_MINOR_OPTIMIZATION,
+ new LastOptimizingDurationGauge(OptimizingType.MINOR));
+ registerMetric(
+ registry,
+ TABLE_OPTIMIZING_SINCE_LAST_MAJOR_OPTIMIZATION,
+ new LastOptimizingDurationGauge(OptimizingType.MAJOR));
+ registerMetric(
+ registry,
+ TABLE_OPTIMIZING_SINCE_LAST_FULL_OPTIMIZATION,
+ new LastOptimizingDurationGauge(OptimizingType.FULL));
+ registerMetric(
+ registry, TABLE_OPTIMIZING_SINCE_LAST_OPTIMIZATION, new
LastOptimizingDurationGauge());
+ registerMetric(registry, TABLE_OPTIMIZING_LAG_DURATION, new
OptimizingLagDurationGauge());
+
globalRegistry = registry;
}
}
@@ -262,13 +318,60 @@ public class TableOptimizingMetrics {
this.statusSetTimestamp = statusSetTimestamp;
}
+ /**
+ * Handle table self optimizing process complete event.
+ *
+ * @param processType optimizing process type.
+ * @param lastTime last optimizing timestamp.
+ */
+ public void lastOptimizingTime(OptimizingType processType, long lastTime) {
+ switch (processType) {
+ case MINOR:
+ this.lastMinorTime = lastTime;
+ break;
+ case MAJOR:
+ this.lastMajorTime = lastTime;
+ break;
+ case FULL:
+ this.lastFullTime = lastTime;
+ break;
+ }
+ }
+
+ public void nonMaintainedSnapshotTime(Snapshot snapshot) {
+ if (snapshot == null) {
+ return;
+ }
+ // ignore snapshot which is created by amoro maintain commits or no files
added
+ if (snapshot.summary().values().stream()
+ .anyMatch(IcebergTableMaintainer.AMORO_MAINTAIN_COMMITS::contains)
+ ||
Long.parseLong(snapshot.summary().getOrDefault(SnapshotSummary.ADDED_FILES_PROP,
"0"))
+ == 0) {
+ return;
+ }
+
+ this.lastNonMaintainedTime = Longs.max(lastNonMaintainedTime,
snapshot.timestampMillis());
+ }
+
+ public void lastOptimizingSnapshotTime(Snapshot snapshot) {
+ if (snapshot == null) {
+ return;
+ }
+
+ this.lastOptimizingTime =
+ Longs.max(
+ lastOptimizingTime,
+ snapshot.timestampMillis(),
+ Longs.max(lastMinorTime, lastMajorTime, lastFullTime));
+ }
+
/**
* Handle table self optimizing process completed event.
*
* @param processType optimizing process type.
* @param success is optimizing process success.
*/
- public void processComplete(OptimizingType processType, boolean success) {
+ public void processComplete(OptimizingType processType, boolean success,
long planTime) {
processTotalCount.inc();
Counter totalCounter = null;
Counter failedCounter = null;
@@ -286,6 +389,9 @@ public class TableOptimizingMetrics {
failedCounter = fullFailedCount;
break;
}
+ if (success) {
+ lastOptimizingTime(processType, planTime);
+ }
if (totalCounter != null) {
totalCounter.inc();
}
@@ -334,6 +440,54 @@ public class TableOptimizingMetrics {
}
}
+ class LastOptimizingDurationGauge implements Gauge<Long> {
+ final OptimizingType optimizingType;
+
+ LastOptimizingDurationGauge(OptimizingType optimizingType) {
+ this.optimizingType = optimizingType;
+ }
+
+ LastOptimizingDurationGauge() {
+ optimizingType = null;
+ }
+
+ @Override
+ public Long getValue() {
+ if (optimizingType == null) {
+ return optimizingInterval(lastOptimizingTime);
+ }
+
+ switch (optimizingType) {
+ case MINOR:
+ return optimizingInterval(lastMinorTime);
+ case MAJOR:
+ return optimizingInterval(lastMajorTime);
+ case FULL:
+ return optimizingInterval(lastFullTime);
+ default:
+ return AmoroServiceConstants.INVALID_TIME;
+ }
+ }
+
+ private long optimizingInterval(long lastOptimizedTime) {
+ return lastOptimizedTime > 0
+ ? System.currentTimeMillis() - lastOptimizedTime
+ : AmoroServiceConstants.INVALID_TIME;
+ }
+ }
+
+ class OptimizingLagDurationGauge implements Gauge<Long> {
+ @Override
+ public Long getValue() {
+ if (lastNonMaintainedTime == AmoroServiceConstants.INVALID_TIME
+ || lastOptimizingTime == AmoroServiceConstants.INVALID_TIME) {
+ return AmoroServiceConstants.INVALID_TIME;
+ } else {
+ return lastNonMaintainedTime - lastOptimizingTime;
+ }
+ }
+ }
+
class IsInStatusGauge implements Gauge<Long> {
final String targetStatus;
diff --git
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/TableRuntime.java
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/TableRuntime.java
index 9cf17b4fc..ae9ce9b93 100644
---
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/TableRuntime.java
+++
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/table/TableRuntime.java
@@ -40,7 +40,10 @@ import org.apache.amoro.server.table.blocker.TableBlocker;
import org.apache.amoro.server.utils.IcebergTableUtil;
import org.apache.amoro.shade.guava32.com.google.common.base.MoreObjects;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
+import org.apache.amoro.table.BaseTable;
+import org.apache.amoro.table.ChangeTable;
import org.apache.amoro.table.MixedTable;
+import org.apache.amoro.table.UnkeyedTable;
import org.apache.iceberg.Snapshot;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -133,6 +136,9 @@ public class TableRuntime extends StatedPersistentBase {
this.pendingInput = tableRuntimeMeta.getPendingInput();
optimizingMetrics = new TableOptimizingMetrics(tableIdentifier);
optimizingMetrics.statusChanged(optimizingStatus,
this.currentStatusStartTime);
+ optimizingMetrics.lastOptimizingTime(OptimizingType.MINOR,
this.lastMinorOptimizingTime);
+ optimizingMetrics.lastOptimizingTime(OptimizingType.MAJOR,
this.lastMajorOptimizingTime);
+ optimizingMetrics.lastOptimizingTime(OptimizingType.FULL,
this.lastFullOptimizingTime);
orphanFilesCleaningMetrics = new
TableOrphanFilesCleaningMetrics(tableIdentifier);
}
@@ -300,10 +306,10 @@ public class TableRuntime extends StatedPersistentBase {
lastFullOptimizingTime = optimizingProcess.getPlanTime();
}
}
+ optimizingMetrics.processComplete(processType, success,
optimizingProcess.getPlanTime());
updateOptimizingStatus(OptimizingStatus.IDLE);
optimizingProcess = null;
persistUpdatingRuntime();
- optimizingMetrics.processComplete(processType, success);
tableHandler.handleTableChanged(this, originalStatus);
});
}
@@ -316,12 +322,16 @@ public class TableRuntime extends StatedPersistentBase {
private boolean refreshSnapshots(AmoroTable<?> amoroTable) {
MixedTable table = (MixedTable) amoroTable.originalTable();
+
+ long lastSnapshotId = currentSnapshotId;
if (table.isKeyedTable()) {
- long lastSnapshotId = currentSnapshotId;
long changeSnapshotId = currentChangeSnapshotId;
- currentSnapshotId =
IcebergTableUtil.getSnapshotId(table.asKeyedTable().baseTable(), false);
- currentChangeSnapshotId =
- IcebergTableUtil.getSnapshotId(table.asKeyedTable().changeTable(),
false);
+ ChangeTable changeTable = table.asKeyedTable().changeTable();
+ BaseTable baseTable = table.asKeyedTable().baseTable();
+
+ currentChangeSnapshotId = doRefreshSnapshots(changeTable);
+ currentSnapshotId = doRefreshSnapshots(baseTable);
+
if (currentSnapshotId != lastSnapshotId || currentChangeSnapshotId !=
changeSnapshotId) {
LOG.info(
"Refreshing table {} with base snapshot id {} and change snapshot
id {}",
@@ -331,9 +341,7 @@ public class TableRuntime extends StatedPersistentBase {
return true;
}
} else {
- long lastSnapshotId = currentSnapshotId;
- Snapshot currentSnapshot = table.asUnkeyedTable().currentSnapshot();
- currentSnapshotId = currentSnapshot == null ? -1 :
currentSnapshot.snapshotId();
+ currentSnapshotId = doRefreshSnapshots((UnkeyedTable) table);
if (currentSnapshotId != lastSnapshotId) {
LOG.info(
"Refreshing table {} with base snapshot id {}", tableIdentifier,
currentSnapshotId);
@@ -343,6 +351,26 @@ public class TableRuntime extends StatedPersistentBase {
return false;
}
+ /**
+ * Refresh snapshots for table.
+ *
+ * @param table - table
+ * @return refreshed snapshotId
+ */
+ private long doRefreshSnapshots(UnkeyedTable table) {
+ long currentSnapshotId = AmoroServiceConstants.INVALID_SNAPSHOT_ID;
+ Snapshot currentSnapshot = IcebergTableUtil.getSnapshot(table, false);
+ if (currentSnapshot != null) {
+ currentSnapshotId = currentSnapshot.snapshotId();
+ }
+
+ optimizingMetrics.nonMaintainedSnapshotTime(currentSnapshot);
+ optimizingMetrics.lastOptimizingSnapshotTime(
+ IcebergTableUtil.findLatestOptimizingSnapshot(table).orElse(null));
+
+ return currentSnapshotId;
+ }
+
public OptimizingEvaluator.PendingInput getPendingInput() {
return pendingInput;
}
diff --git
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/utils/IcebergTableUtil.java
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/utils/IcebergTableUtil.java
index 911c97ad9..2a78dc34b 100644
---
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/utils/IcebergTableUtil.java
+++
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/utils/IcebergTableUtil.java
@@ -19,6 +19,7 @@
package org.apache.amoro.server.utils;
import org.apache.amoro.IcebergFileEntry;
+import org.apache.amoro.api.CommitMetaProducer;
import org.apache.amoro.scan.TableEntriesScan;
import org.apache.amoro.server.AmoroServiceConstants;
import org.apache.amoro.server.table.BasicTableSnapshot;
@@ -33,6 +34,7 @@ import
org.apache.amoro.shade.guava32.com.google.common.collect.Sets;
import org.apache.amoro.table.MixedTable;
import org.apache.amoro.utils.TableFileUtil;
import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.DataOperations;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileContent;
import org.apache.iceberg.FileScanTask;
@@ -95,6 +97,14 @@ public class IcebergTableUtil {
return Optional.ofNullable(Iterables.tryFind(snapshots,
predicate).orNull());
}
+ public static Optional<Snapshot> findLatestOptimizingSnapshot(Table table) {
+ return IcebergTableUtil.findFirstMatchSnapshot(
+ table,
+ snapshot ->
+
snapshot.summary().containsValue(CommitMetaProducer.OPTIMIZE.name())
+ && DataOperations.REPLACE.equals(snapshot.operation()));
+ }
+
public static Set<String> getAllContentFilePath(Table internalTable) {
Set<String> validFilesPath = new HashSet<>();
diff --git
a/amoro-ams/dist/src/main/amoro-bin/conf/plugins/metric-reporters.yaml
b/amoro-ams/dist/src/main/amoro-bin/conf/plugins/metric-reporters.yaml
index a68a772b2..1a9639f6a 100644
--- a/amoro-ams/dist/src/main/amoro-bin/conf/plugins/metric-reporters.yaml
+++ b/amoro-ams/dist/src/main/amoro-bin/conf/plugins/metric-reporters.yaml
@@ -20,6 +20,6 @@
#metric-reporters:
# - name: prometheus-exporter # configs for prometheus exporter
-# enabled: false
+# enabled: true
# properties:
# port: 7001
\ No newline at end of file
diff --git a/docs/user-guides/metrics.md b/docs/user-guides/metrics.md
index 83a4010a4..c84d565d8 100644
--- a/docs/user-guides/metrics.md
+++ b/docs/user-guides/metrics.md
@@ -37,26 +37,31 @@ Amoro has supported built-in metrics to measure status of
table self-optimizing
## Self-optimizing metrics
-| Metric Name | Type | Tags
| Description |
-|---------------------------------------------------|---------|--------------------------|--------------------------------------------------------------|
-| table_optimizing_status_idle_duration_mills | Gauge | catalog,
database, table | Duration in milliseconds after table be in idle status |
-| table_optimizing_status_pending_duration_mills | Gauge | catalog,
database, table | Duration in milliseconds after table be in pending status |
-| table_optimizing_status_planning_duration_mills | Gauge | catalog,
database, table | Duration in milliseconds after table be in planning status |
-| table_optimizing_status_executing_duration_mills | Gauge | catalog,
database, table | Duration in milliseconds after table be in executing status |
-| table_optimizing_status_committing_duration_mills | Gauge | catalog,
database, table | Duration in milliseconds after table be in committing status |
-| table_optimizing_process_total_count | Counter | catalog,
database, table | Count of all optimizing process since ams started |
-| table_optimizing_process_failed_count | Counter | catalog,
database, table | Count of failed optimizing process since ams started |
-| table_optimizing_minor_total_count | Counter | catalog,
database, table | Count of minor optimizing process since ams started |
-| table_optimizing_minor_failed_count | Counter | catalog,
database, table | Count of failed minor optimizing process since ams started |
-| table_optimizing_major_total_count | Counter | catalog,
database, table | Count of major optimizing process since ams started |
-| table_optimizing_major_failed_count | Counter | catalog,
database, table | Count of failed major optimizing process since ams started |
-| table_optimizing_full_total_count | Counter | catalog,
database, table | Count of full optimizing rocess since ams started |
-| table_optimizing_full_failed_count | Counter | catalog,
database, table | Count of failed full optimizing process since ams started |
-| table_optimizing_status_in_idle | Gauge | catalog,
database, table | If currently table is in idle status |
-| table_optimizing_status_in_pending | Gauge | catalog,
database, table | If currently table is in pending status |
-| table_optimizing_status_in_planning | Gauge | catalog,
database, table | If currently table is in planning status |
-| table_optimizing_status_in_executing | Gauge | catalog,
database, table | If currently table is in executing status |
-| table_optimizing_status_in_committing | Gauge | catalog,
database, table | If currently table is in committing status |
+| Metric Name | Type | Tags
| Description
|
+|------------------------------------------------------|---------|--------------------------|----------------------------------------------------------------------------------------|
+| table_optimizing_status_idle_duration_mills | Gauge | catalog,
database, table | Duration in milliseconds after table be in idle status
|
+| table_optimizing_status_pending_duration_mills | Gauge | catalog,
database, table | Duration in milliseconds after table be in pending status
|
+| table_optimizing_status_planning_duration_mills | Gauge | catalog,
database, table | Duration in milliseconds after table be in planning status
|
+| table_optimizing_status_executing_duration_mills | Gauge | catalog,
database, table | Duration in milliseconds after table be in executing status
|
+| table_optimizing_status_committing_duration_mills | Gauge | catalog,
database, table | Duration in milliseconds after table be in committing status
|
+| table_optimizing_process_total_count | Counter | catalog,
database, table | Count of all optimizing process since ams started
|
+| table_optimizing_process_failed_count | Counter | catalog,
database, table | Count of failed optimizing process since ams started
|
+| table_optimizing_minor_total_count | Counter | catalog,
database, table | Count of minor optimizing process since ams started
|
+| table_optimizing_minor_failed_count | Counter | catalog,
database, table | Count of failed minor optimizing process since ams started
|
+| table_optimizing_major_total_count | Counter | catalog,
database, table | Count of major optimizing process since ams started
|
+| table_optimizing_major_failed_count | Counter | catalog,
database, table | Count of failed major optimizing process since ams started
|
+| table_optimizing_full_total_count | Counter | catalog,
database, table | Count of full optimizing process since ams started
|
+| table_optimizing_full_failed_count | Counter | catalog,
database, table | Count of failed full optimizing process since ams started
|
+| table_optimizing_status_in_idle | Gauge | catalog,
database, table | If currently table is in idle status
|
+| table_optimizing_status_in_pending | Gauge | catalog,
database, table | If currently table is in pending status
|
+| table_optimizing_status_in_planning | Gauge | catalog,
database, table | If currently table is in planning status
|
+| table_optimizing_status_in_executing | Gauge | catalog,
database, table | If currently table is in executing status
|
+| table_optimizing_status_in_committing | Gauge | catalog,
database, table | If currently table is in committing status
|
+| table_optimizing_since_last_minor_optimization_mills | Gauge | catalog,
database, table | Duration in milliseconds since last successful minor
optimization |
+| table_optimizing_since_last_major_optimization_mills | Gauge | catalog,
database, table | Duration in milliseconds since last successful major
optimization |
+| table_optimizing_since_last_full_optimization_mills | Gauge | catalog,
database, table | Duration in milliseconds since last successful full
optimization |
+| table_optimizing_since_last_optimization_mills | Gauge | catalog,
database, table | Duration in milliseconds since last successful optimization
|
+| table_optimizing_lag_duration_mills | Gauge | catalog,
database, table | Duration in milliseconds between last self-optimizing
snapshot and refreshed snapshot |
## Optimizer Group metrics