This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 08ce6b26be Add lastCommittedSnapshotId commit metric and document
missing metrics (#7589)
08ce6b26be is described below
commit 08ce6b26be8366eb0172643379a15a2c6ff6cf27
Author: junmuz <[email protected]>
AuthorDate: Fri Jun 5 03:20:03 2026 +0100
Add lastCommittedSnapshotId commit metric and document missing metrics
(#7589)
---
docs/docs/maintenance/metrics.md | 10 ++++++++++
.../org/apache/paimon/operation/FileStoreCommitImpl.java | 15 +++++++++++----
.../apache/paimon/operation/metrics/CommitMetrics.java | 4 ++++
.../org/apache/paimon/operation/metrics/CommitStats.java | 10 +++++++++-
.../paimon/operation/metrics/CommitMetricsTest.java | 13 +++++++++++--
.../apache/paimon/operation/metrics/CommitStatsTest.java | 12 +++++++++---
.../source/metrics/FileStoreSourceReaderMetrics.java | 1 -
7 files changed, 54 insertions(+), 11 deletions(-)
diff --git a/docs/docs/maintenance/metrics.md b/docs/docs/maintenance/metrics.md
index f6e1576dea..ef764c13e1 100644
--- a/docs/docs/maintenance/metrics.md
+++ b/docs/docs/maintenance/metrics.md
@@ -60,6 +60,11 @@ Below is lists of Paimon built-in metrics. They are
summarized into types of sca
<td>Histogram</td>
<td>Distributions of the time taken by the last few scans.</td>
</tr>
+ <tr>
+ <td>lastScannedSnapshotId</td>
+ <td>Gauge</td>
+ <td>The snapshot ID scanned in the last scan. 0 if no scan has
occurred.</td>
+ </tr>
<tr>
<td>lastScannedManifests</td>
<td>Gauge</td>
@@ -179,6 +184,11 @@ Below is lists of Paimon built-in metrics. They are
summarized into types of sca
<td>Gauge</td>
<td>Total size of the output files for the last compaction.</td>
</tr>
+ <tr>
+ <td>lastCommittedSnapshotId</td>
+ <td>Gauge</td>
+ <td>The snapshot ID created by the last commit. -1 if no commit
has occurred.</td>
+ </tr>
</tbody>
</table>
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index ddab7a0419..7cca259cbf 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -159,6 +159,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
private boolean ignoreEmptyCommit;
private CommitMetrics commitMetrics;
private boolean appendCommitCheckConflict = false;
+ private long lastCommittedSnapshotId = -1L;
public FileStoreCommitImpl(
SnapshotCommit snapshotCommit,
@@ -376,7 +377,8 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
changes.compactChangelog,
commitDuration,
generatedSnapshot,
- attempts);
+ attempts,
+ lastCommittedSnapshotId);
}
}
return generatedSnapshot;
@@ -389,7 +391,8 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
List<ManifestEntry> compactChangelogFiles,
long commitDuration,
int generatedSnapshots,
- int attempts) {
+ int attempts,
+ long lastCommittedSnapshotId) {
CommitStats commitStats =
new CommitStats(
appendTableFiles,
@@ -398,7 +401,8 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
compactChangelogFiles,
commitDuration,
generatedSnapshots,
- attempts);
+ attempts,
+ lastCommittedSnapshotId);
commitMetrics.reportCommit(commitStats);
}
@@ -540,7 +544,8 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
emptyList(),
commitDuration,
generatedSnapshot,
- attempts);
+ attempts,
+ lastCommittedSnapshotId);
}
}
return generatedSnapshot;
@@ -831,6 +836,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
if (snapshot.commitUser().equals(commitUser)
&& snapshot.commitIdentifier() == identifier
&& snapshot.commitKind() == commitKind) {
+ lastCommittedSnapshotId = snapshot.id();
return new SuccessCommitResult();
}
}
@@ -1102,6 +1108,7 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
if (strictModeChecker != null) {
strictModeChecker.update(newSnapshotId);
}
+ lastCommittedSnapshotId = newSnapshotId;
CommitCallback.Context context =
new CommitCallback.Context(
finalBaseFiles, finalDeltaFiles, indexFiles,
newSnapshot, identifier);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CommitMetrics.java
b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CommitMetrics.java
index c89bae8c9a..874635a9fd 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CommitMetrics.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CommitMetrics.java
@@ -74,6 +74,7 @@ public class CommitMetrics {
public static final String LAST_COMPACTION_INPUT_FILE_SIZE =
"lastCompactionInputFileSize";
public static final String LAST_COMPACTION_OUTPUT_FILE_SIZE =
"lastCompactionOutputFileSize";
+ public static final String LAST_COMMITTED_SNAPSHOT_ID =
"lastCommittedSnapshotId";
private void registerGenericCommitMetrics() {
metricGroup.gauge(
@@ -126,6 +127,9 @@ public class CommitMetrics {
metricGroup.gauge(
LAST_COMPACTION_OUTPUT_FILE_SIZE,
() -> latestCommit == null ? 0L :
latestCommit.getCompactionOutputFileSize());
+ metricGroup.gauge(
+ LAST_COMMITTED_SNAPSHOT_ID,
+ () -> latestCommit == null ? -1L :
latestCommit.getLastCommittedSnapshotId());
}
public void reportCommit(CommitStats commitStats) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CommitStats.java
b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CommitStats.java
index 11d6854270..ed53b611c2 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CommitStats.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CommitStats.java
@@ -53,6 +53,7 @@ public class CommitStats {
private final long generatedSnapshots;
private final long numPartitionsWritten;
private final long numBucketsWritten;
+ private final long lastCommittedSnapshotId;
public CommitStats(
List<ManifestEntry> appendTableFiles,
@@ -61,7 +62,8 @@ public class CommitStats {
List<ManifestEntry> compactChangelogFiles,
long commitDuration,
int generatedSnapshots,
- int attempts) {
+ int attempts,
+ long lastCommittedSnapshotId) {
List<ManifestEntry> addedTableFiles =
appendTableFiles.stream()
.filter(f -> FileKind.ADD.equals(f.kind()))
@@ -110,6 +112,7 @@ public class CommitStats {
this.duration = commitDuration;
this.generatedSnapshots = generatedSnapshots;
this.attempts = attempts;
+ this.lastCommittedSnapshotId = lastCommittedSnapshotId;
}
@VisibleForTesting
@@ -236,4 +239,9 @@ public class CommitStats {
public long getCompactionOutputFileSize() {
return compactionOutputFileSize;
}
+
+ @VisibleForTesting
+ protected long getLastCommittedSnapshotId() {
+ return lastCommittedSnapshotId;
+ }
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/metrics/CommitMetricsTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/metrics/CommitMetricsTest.java
index 6a79a0ae58..9b475f34c8 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/metrics/CommitMetricsTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/metrics/CommitMetricsTest.java
@@ -100,6 +100,10 @@ public class CommitMetricsTest {
registeredGenericMetrics.get(
CommitMetrics.LAST_CHANGELOG_RECORDS_COMMIT_COMPACTED);
+ Gauge<Long> lastCommittedSnapshotId =
+ (Gauge<Long>)
+
registeredGenericMetrics.get(CommitMetrics.LAST_COMMITTED_SNAPSHOT_ID);
+
assertThat(lastCommitDuration.getValue()).isEqualTo(0);
assertThat(commitDuration.getCount()).isEqualTo(0);
assertThat(commitDuration.getStatistics().size()).isEqualTo(0);
@@ -117,6 +121,7 @@ public class CommitMetricsTest {
assertThat(lastChangelogRecordsAppended.getValue()).isEqualTo(0);
assertThat(lastDeltaRecordsCompacted.getValue()).isEqualTo(0);
assertThat(lastChangelogRecordsCompacted.getValue()).isEqualTo(0);
+ assertThat(lastCommittedSnapshotId.getValue()).isEqualTo(-1);
// report once
reportOnce(commitMetrics);
@@ -145,6 +150,7 @@ public class CommitMetricsTest {
assertThat(lastChangelogRecordsAppended.getValue()).isEqualTo(503);
assertThat(lastDeltaRecordsCompacted.getValue()).isEqualTo(613);
assertThat(lastChangelogRecordsCompacted.getValue()).isEqualTo(512);
+ assertThat(lastCommittedSnapshotId.getValue()).isEqualTo(42);
// report again
reportAgain(commitMetrics);
@@ -173,6 +179,7 @@ public class CommitMetricsTest {
assertThat(lastChangelogRecordsAppended.getValue()).isEqualTo(213);
assertThat(lastDeltaRecordsCompacted.getValue()).isEqualTo(506);
assertThat(lastChangelogRecordsCompacted.getValue()).isEqualTo(601);
+ assertThat(lastCommittedSnapshotId.getValue()).isEqualTo(99);
}
private void reportOnce(CommitMetrics commitMetrics) {
@@ -199,7 +206,8 @@ public class CommitMetricsTest {
compactChangelogFiles,
200,
2,
- 1);
+ 1,
+ 42L);
commitMetrics.reportCommit(commitStats);
}
@@ -228,7 +236,8 @@ public class CommitMetricsTest {
compactChangelogFiles,
500,
1,
- 2);
+ 2,
+ 99L);
commitMetrics.reportCommit(commitStats);
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/metrics/CommitStatsTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/metrics/CommitStatsTest.java
index e4a2a7fd22..e705dfd410 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/metrics/CommitStatsTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/metrics/CommitStatsTest.java
@@ -78,7 +78,8 @@ public class CommitStatsTest {
Collections.emptyList(),
0,
0,
- 1);
+ 1,
+ -1L);
assertThat(commitStats.getTableFilesAdded()).isEqualTo(0);
assertThat(commitStats.getTableFilesDeleted()).isEqualTo(0);
assertThat(commitStats.getTableFilesAppended()).isEqualTo(0);
@@ -94,6 +95,7 @@ public class CommitStatsTest {
assertThat(commitStats.getNumBucketsWritten()).isEqualTo(0);
assertThat(commitStats.getDuration()).isEqualTo(0);
assertThat(commitStats.getAttempts()).isEqualTo(1);
+ assertThat(commitStats.getLastCommittedSnapshotId()).isEqualTo(-1);
}
@Test
@@ -106,7 +108,8 @@ public class CommitStatsTest {
Collections.emptyList(),
3000,
1,
- 2);
+ 2,
+ 5L);
assertThat(commitStats.getTableFilesAdded()).isEqualTo(2);
assertThat(commitStats.getTableFilesDeleted()).isEqualTo(0);
assertThat(commitStats.getTableFilesAppended()).isEqualTo(2);
@@ -122,6 +125,7 @@ public class CommitStatsTest {
assertThat(commitStats.getNumBucketsWritten()).isEqualTo(2);
assertThat(commitStats.getDuration()).isEqualTo(3000);
assertThat(commitStats.getAttempts()).isEqualTo(2);
+ assertThat(commitStats.getLastCommittedSnapshotId()).isEqualTo(5);
}
@Test
@@ -134,7 +138,8 @@ public class CommitStatsTest {
compactChangelogFiles,
3000,
2,
- 2);
+ 2,
+ 10L);
assertThat(commitStats.getTableFilesAdded()).isEqualTo(4);
assertThat(commitStats.getTableFilesDeleted()).isEqualTo(1);
assertThat(commitStats.getTableFilesAppended()).isEqualTo(2);
@@ -150,5 +155,6 @@ public class CommitStatsTest {
assertThat(commitStats.getNumBucketsWritten()).isEqualTo(3);
assertThat(commitStats.getDuration()).isEqualTo(3000);
assertThat(commitStats.getAttempts()).isEqualTo(2);
+ assertThat(commitStats.getLastCommittedSnapshotId()).isEqualTo(10);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/metrics/FileStoreSourceReaderMetrics.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/metrics/FileStoreSourceReaderMetrics.java
index a270e0ecee..cdbf770c1f 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/metrics/FileStoreSourceReaderMetrics.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/metrics/FileStoreSourceReaderMetrics.java
@@ -27,7 +27,6 @@ public class FileStoreSourceReaderMetrics {
private long latestFileCreationTime = UNDEFINED;
private long lastSplitUpdateTime = UNDEFINED;
-
public static final long UNDEFINED = -1L;
public static final long ACTIVE = Long.MAX_VALUE;