This is an automated email from the ASF dual-hosted git repository.

czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new f09f63fb3 [core] Support compaction metrics (#2224)
f09f63fb3 is described below

commit f09f63fb3f2b3243d38670fc17f22c3cfc2f59de
Author: GuojunLi <[email protected]>
AuthorDate: Wed Nov 8 17:19:41 2023 +0800

    [core] Support compaction metrics (#2224)
    
    This closes #2224.
---
 .../org/apache/paimon/AppendOnlyFileStore.java     |   8 +-
 .../java/org/apache/paimon/KeyValueFileStore.java  |   8 +-
 .../paimon/append/AppendOnlyCompactManager.java    |  26 ++-
 .../paimon/append/AppendOnlyCompactionTask.java    |  32 ++--
 .../org/apache/paimon/compact/CompactTask.java     |  39 ++++-
 .../mergetree/compact/MergeTreeCompactManager.java |  11 +-
 .../mergetree/compact/MergeTreeCompactTask.java    |   7 +-
 .../paimon/operation/AbstractFileStoreWrite.java   |  37 ++++-
 .../paimon/operation/AppendOnlyFileStoreWrite.java |   9 +-
 .../apache/paimon/operation/FileStoreWrite.java    |   4 +
 .../paimon/operation/KeyValueFileStoreWrite.java   |  20 ++-
 .../paimon/operation/MemoryFileStoreWrite.java     |   7 +-
 .../operation/metrics/CompactionMetrics.java       | 110 +++++++++++++
 .../paimon/operation/metrics/CompactionStats.java  | 102 ++++++++++++
 .../paimon/table/AppendOnlyFileStoreTable.java     |   3 +-
 .../paimon/table/PrimaryKeyFileStoreTable.java     |   3 +-
 .../org/apache/paimon/table/sink/TableWrite.java   |   4 +
 .../apache/paimon/table/sink/TableWriteImpl.java   |   7 +
 .../test/java/org/apache/paimon/TestFileStore.java |   3 +-
 .../append/AppendOnlyCompactManagerTest.java       |   4 +-
 .../apache/paimon/append/AppendOnlyWriterTest.java |   3 +-
 .../apache/paimon/append/FullCompactTaskTest.java  |   2 +-
 .../apache/paimon/format/FileFormatSuffixTest.java |   3 +-
 .../apache/paimon/mergetree/MergeTreeTestBase.java |   6 +-
 .../compact/MergeTreeCompactManagerTest.java       |   3 +-
 .../operation/metrics/CompactionMetricsTest.java   | 174 +++++++++++++++++++++
 .../flink/source/TestChangelogDataReadWrite.java   |   3 +-
 27 files changed, 588 insertions(+), 50 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
index f5fe41617..a36d02bd8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
@@ -43,6 +43,7 @@ public class AppendOnlyFileStore extends 
AbstractFileStore<InternalRow> {
 
     private final RowType bucketKeyType;
     private final RowType rowType;
+    private final String tableName;
 
     public AppendOnlyFileStore(
             FileIO fileIO,
@@ -51,10 +52,12 @@ public class AppendOnlyFileStore extends 
AbstractFileStore<InternalRow> {
             CoreOptions options,
             RowType partitionType,
             RowType bucketKeyType,
-            RowType rowType) {
+            RowType rowType,
+            String tableName) {
         super(fileIO, schemaManager, schemaId, options, partitionType);
         this.bucketKeyType = bucketKeyType;
         this.rowType = rowType;
+        this.tableName = tableName;
     }
 
     @Override
@@ -95,7 +98,8 @@ public class AppendOnlyFileStore extends 
AbstractFileStore<InternalRow> {
                 pathFactory(),
                 snapshotManager(),
                 newScan(true).withManifestCacheFilter(manifestFilter),
-                options);
+                options,
+                tableName);
     }
 
     private AppendOnlyFileStoreScan newScan(boolean forWrite) {
diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
index ef122ee78..98b88323f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
@@ -65,6 +65,7 @@ public class KeyValueFileStore extends 
AbstractFileStore<KeyValue> {
     private final Supplier<Comparator<InternalRow>> keyComparatorSupplier;
     private final Supplier<RecordEqualiser> valueEqualiserSupplier;
     private final MergeFunctionFactory<KeyValue> mfFactory;
+    private final String tableName;
 
     public KeyValueFileStore(
             FileIO fileIO,
@@ -77,7 +78,8 @@ public class KeyValueFileStore extends 
AbstractFileStore<KeyValue> {
             RowType keyType,
             RowType valueType,
             KeyValueFieldsExtractor keyValueFieldsExtractor,
-            MergeFunctionFactory<KeyValue> mfFactory) {
+            MergeFunctionFactory<KeyValue> mfFactory,
+            String tableName) {
         super(fileIO, schemaManager, schemaId, options, partitionType);
         this.crossPartitionUpdate = crossPartitionUpdate;
         this.bucketKeyType = bucketKeyType;
@@ -87,6 +89,7 @@ public class KeyValueFileStore extends 
AbstractFileStore<KeyValue> {
         this.mfFactory = mfFactory;
         this.keyComparatorSupplier = new KeyComparatorSupplier(keyType);
         this.valueEqualiserSupplier = new ValueEqualiserSupplier(valueType);
+        this.tableName = tableName;
     }
 
     @Override
@@ -147,7 +150,8 @@ public class KeyValueFileStore extends 
AbstractFileStore<KeyValue> {
                 newScan(true).withManifestCacheFilter(manifestFilter),
                 indexFactory,
                 options,
-                keyValueFieldsExtractor);
+                keyValueFieldsExtractor,
+                tableName);
     }
 
     private Map<String, FileStorePathFactory> format2PathFactory() {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyCompactManager.java
 
b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyCompactManager.java
index dd934ff20..b3569a0fb 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyCompactManager.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyCompactManager.java
@@ -24,11 +24,14 @@ import org.apache.paimon.compact.CompactFutureManager;
 import org.apache.paimon.compact.CompactResult;
 import org.apache.paimon.compact.CompactTask;
 import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.operation.metrics.CompactionMetrics;
 import org.apache.paimon.utils.Preconditions;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -56,13 +59,16 @@ public class AppendOnlyCompactManager extends 
CompactFutureManager {
 
     private List<DataFileMeta> compacting;
 
+    @Nullable private final CompactionMetrics metrics;
+
     public AppendOnlyCompactManager(
             ExecutorService executor,
             List<DataFileMeta> restored,
             int minFileNum,
             int maxFileNum,
             long targetFileSize,
-            CompactRewriter rewriter) {
+            CompactRewriter rewriter,
+            @Nullable CompactionMetrics metrics) {
         this.executor = executor;
         this.toCompact = new TreeSet<>(fileComparator());
         this.toCompact.addAll(restored);
@@ -70,6 +76,7 @@ public class AppendOnlyCompactManager extends 
CompactFutureManager {
         this.maxFileNum = maxFileNum;
         this.targetFileSize = targetFileSize;
         this.rewriter = rewriter;
+        this.metrics = metrics;
     }
 
     @Override
@@ -90,7 +97,8 @@ public class AppendOnlyCompactManager extends 
CompactFutureManager {
             return;
         }
 
-        taskFuture = executor.submit(new FullCompactTask(toCompact, 
targetFileSize, rewriter));
+        taskFuture =
+                executor.submit(new FullCompactTask(toCompact, targetFileSize, 
rewriter, metrics));
         compacting = new ArrayList<>(toCompact);
         toCompact.clear();
     }
@@ -102,7 +110,7 @@ public class AppendOnlyCompactManager extends 
CompactFutureManager {
         Optional<List<DataFileMeta>> picked = pickCompactBefore();
         if (picked.isPresent()) {
             compacting = picked.get();
-            taskFuture = executor.submit(new AutoCompactTask(compacting, 
rewriter));
+            taskFuture = executor.submit(new AutoCompactTask(compacting, 
rewriter, metrics));
         }
     }
 
@@ -197,7 +205,11 @@ public class AppendOnlyCompactManager extends 
CompactFutureManager {
         private final CompactRewriter rewriter;
 
         public FullCompactTask(
-                Collection<DataFileMeta> inputs, long targetFileSize, 
CompactRewriter rewriter) {
+                Collection<DataFileMeta> inputs,
+                long targetFileSize,
+                CompactRewriter rewriter,
+                @Nullable CompactionMetrics metrics) {
+            super(metrics);
             this.inputs = new LinkedList<>(inputs);
             this.targetFileSize = targetFileSize;
             this.rewriter = rewriter;
@@ -249,7 +261,11 @@ public class AppendOnlyCompactManager extends 
CompactFutureManager {
         private final List<DataFileMeta> toCompact;
         private final CompactRewriter rewriter;
 
-        public AutoCompactTask(List<DataFileMeta> toCompact, CompactRewriter 
rewriter) {
+        public AutoCompactTask(
+                List<DataFileMeta> toCompact,
+                CompactRewriter rewriter,
+                @Nullable CompactionMetrics metrics) {
+            super(metrics);
             this.toCompact = toCompact;
             this.rewriter = rewriter;
         }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyCompactionTask.java
 
b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyCompactionTask.java
index 3ef3da40e..0f9191471 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyCompactionTask.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyCompactionTask.java
@@ -23,6 +23,8 @@ import org.apache.paimon.io.CompactIncrement;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.NewFilesIncrement;
 import org.apache.paimon.operation.AppendOnlyFileStoreWrite;
+import org.apache.paimon.operation.metrics.CompactionMetrics;
+import org.apache.paimon.operation.metrics.CompactionStats;
 import org.apache.paimon.table.sink.CommitMessage;
 import org.apache.paimon.table.sink.CommitMessageImpl;
 import org.apache.paimon.utils.Preconditions;
@@ -61,15 +63,27 @@ public class AppendOnlyCompactionTask {
     }
 
     public CommitMessage doCompact(AppendOnlyFileStoreWrite write) throws 
Exception {
-        compactAfter.addAll(write.compactRewriter(partition, 
0).rewrite(compactBefore));
-        CompactIncrement compactIncrement =
-                new CompactIncrement(compactBefore, compactAfter, 
Collections.emptyList());
-        return new CommitMessageImpl(
-                partition,
-                0, // bucket 0 is bucket for unaware-bucket table for 
compatibility with the old
-                // design
-                NewFilesIncrement.emptyIncrement(),
-                compactIncrement);
+        CompactionMetrics metrics = write.getCompactionMetrics(partition, 0);
+        long startMillis = System.currentTimeMillis();
+        try {
+            compactAfter.addAll(write.compactRewriter(partition, 
0).rewrite(compactBefore));
+            CompactIncrement compactIncrement =
+                    new CompactIncrement(compactBefore, compactAfter, 
Collections.emptyList());
+            return new CommitMessageImpl(
+                    partition,
+                    0, // bucket 0 is bucket for unaware-bucket table for 
compatibility with the old
+                    // design
+                    NewFilesIncrement.emptyIncrement(),
+                    compactIncrement);
+        } finally {
+            if (metrics != null) {
+                long duration = System.currentTimeMillis() - startMillis;
+                CompactionStats compactionStats =
+                        new CompactionStats(
+                                duration, compactBefore, compactAfter, 
Collections.emptyList());
+                metrics.reportCompaction(compactionStats);
+            }
+        }
     }
 
     public int hashCode() {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/compact/CompactTask.java 
b/paimon-core/src/main/java/org/apache/paimon/compact/CompactTask.java
index a1988b7c9..dad4aa374 100644
--- a/paimon-core/src/main/java/org/apache/paimon/compact/CompactTask.java
+++ b/paimon-core/src/main/java/org/apache/paimon/compact/CompactTask.java
@@ -19,10 +19,15 @@
 package org.apache.paimon.compact;
 
 import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.operation.metrics.CompactionMetrics;
+import org.apache.paimon.operation.metrics.CompactionStats;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.Callable;
 
@@ -30,17 +35,41 @@ import java.util.concurrent.Callable;
 public abstract class CompactTask implements Callable<CompactResult> {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(CompactTask.class);
+    @Nullable private final CompactionMetrics metrics;
+
+    public CompactTask(@Nullable CompactionMetrics metrics) {
+        this.metrics = metrics;
+    }
 
     @Override
     public CompactResult call() throws Exception {
         long startMillis = System.currentTimeMillis();
-        CompactResult result = doCompact();
+        CompactResult result = null;
+        try {
+            result = doCompact();
 
-        if (LOG.isDebugEnabled()) {
-            logMetric(startMillis, result.before(), result.after());
+            if (LOG.isDebugEnabled()) {
+                logMetric(startMillis, result.before(), result.after());
+            }
+            return result;
+        } finally {
+            if (metrics != null) {
+                long duration = System.currentTimeMillis() - startMillis;
+                CompactionStats compactionStats =
+                        result == null
+                                ? new CompactionStats(
+                                        duration,
+                                        Collections.emptyList(),
+                                        Collections.emptyList(),
+                                        Collections.emptyList())
+                                : new CompactionStats(
+                                        duration,
+                                        result.before(),
+                                        result.after(),
+                                        result.changelog());
+                metrics.reportCompaction(compactionStats);
+            }
         }
-
-        return result;
     }
 
     protected String logMetric(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
index 300bfe84b..49935f2f6 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
@@ -27,11 +27,14 @@ import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.mergetree.LevelSortedRun;
 import org.apache.paimon.mergetree.Levels;
+import org.apache.paimon.operation.metrics.CompactionMetrics;
 import org.apache.paimon.utils.Preconditions;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.Comparator;
 import java.util.List;
@@ -53,6 +56,8 @@ public class MergeTreeCompactManager extends 
CompactFutureManager {
     private final int numSortedRunStopTrigger;
     private final CompactRewriter rewriter;
 
+    @Nullable private final CompactionMetrics metrics;
+
     public MergeTreeCompactManager(
             ExecutorService executor,
             Levels levels,
@@ -60,7 +65,8 @@ public class MergeTreeCompactManager extends 
CompactFutureManager {
             Comparator<InternalRow> keyComparator,
             long compactionFileSize,
             int numSortedRunStopTrigger,
-            CompactRewriter rewriter) {
+            CompactRewriter rewriter,
+            @Nullable CompactionMetrics metrics) {
         this.executor = executor;
         this.levels = levels;
         this.strategy = strategy;
@@ -68,6 +74,7 @@ public class MergeTreeCompactManager extends 
CompactFutureManager {
         this.numSortedRunStopTrigger = numSortedRunStopTrigger;
         this.keyComparator = keyComparator;
         this.rewriter = rewriter;
+        this.metrics = metrics;
     }
 
     @Override
@@ -162,7 +169,7 @@ public class MergeTreeCompactManager extends 
CompactFutureManager {
     private void submitCompaction(CompactUnit unit, boolean dropDelete) {
         MergeTreeCompactTask task =
                 new MergeTreeCompactTask(
-                        keyComparator, compactionFileSize, rewriter, unit, 
dropDelete);
+                        keyComparator, compactionFileSize, rewriter, unit, 
dropDelete, metrics);
         if (LOG.isDebugEnabled()) {
             LOG.debug(
                     "Pick these files (name, level, size) for compaction: {}",
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactTask.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactTask.java
index 150881f10..4fc11b076 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactTask.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactTask.java
@@ -24,6 +24,9 @@ import org.apache.paimon.compact.CompactUnit;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.mergetree.SortedRun;
+import org.apache.paimon.operation.metrics.CompactionMetrics;
+
+import javax.annotation.Nullable;
 
 import java.util.ArrayList;
 import java.util.Comparator;
@@ -50,7 +53,9 @@ public class MergeTreeCompactTask extends CompactTask {
             long minFileSize,
             CompactRewriter rewriter,
             CompactUnit unit,
-            boolean dropDelete) {
+            boolean dropDelete,
+            @Nullable CompactionMetrics metrics) {
+        super(metrics);
         this.minFileSize = minFileSize;
         this.rewriter = rewriter;
         this.outputLevel = unit.outputLevel();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
index 125756a01..a5f64562b 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
@@ -22,16 +22,20 @@ import org.apache.paimon.Snapshot;
 import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.fs.Path;
 import org.apache.paimon.index.IndexFileMeta;
 import org.apache.paimon.index.IndexMaintainer;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.IndexIncrement;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.memory.MemoryPoolFactory;
+import org.apache.paimon.metrics.MetricRegistry;
+import org.apache.paimon.operation.metrics.CompactionMetrics;
 import org.apache.paimon.table.sink.CommitMessage;
 import org.apache.paimon.table.sink.CommitMessageImpl;
 import org.apache.paimon.utils.CommitIncrement;
 import org.apache.paimon.utils.ExecutorThreadFactory;
+import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.RecordWriter;
 import org.apache.paimon.utils.Restorable;
 import org.apache.paimon.utils.SnapshotManager;
@@ -73,18 +77,25 @@ public abstract class AbstractFileStoreWrite<T>
     private boolean closeCompactExecutorWhenLeaving = true;
     private boolean ignorePreviousFiles = false;
     protected boolean isStreamingMode = false;
+    private MetricRegistry metricRegistry = null;
+    private final String tableName;
+    private final FileStorePathFactory pathFactory;
 
     protected AbstractFileStoreWrite(
             String commitUser,
             SnapshotManager snapshotManager,
             FileStoreScan scan,
-            @Nullable IndexMaintainer.Factory<T> indexFactory) {
+            @Nullable IndexMaintainer.Factory<T> indexFactory,
+            String tableName,
+            FileStorePathFactory pathFactory) {
         this.commitUser = commitUser;
         this.snapshotManager = snapshotManager;
         this.scan = scan;
         this.indexFactory = indexFactory;
 
         this.writers = new HashMap<>();
+        this.tableName = tableName;
+        this.pathFactory = pathFactory;
     }
 
     @Override
@@ -343,6 +354,30 @@ public abstract class AbstractFileStoreWrite<T>
         this.isStreamingMode = isStreamingMode;
     }
 
+    @Override
+    public FileStoreWrite<T> withMetricRegistry(MetricRegistry metricRegistry) 
{
+        this.metricRegistry = metricRegistry;
+        return this;
+    }
+
+    @Nullable
+    public CompactionMetrics getCompactionMetrics(BinaryRow partition, int 
bucket) {
+        if (metricRegistry != null) {
+            return new CompactionMetrics(
+                    metricRegistry, tableName, getPartitionString(pathFactory, 
partition), bucket);
+        }
+        return null;
+    }
+
+    private String getPartitionString(FileStorePathFactory pathFactory, 
BinaryRow partition) {
+        String partitionStr =
+                
pathFactory.getPartitionString(partition).replace(Path.SEPARATOR, "_");
+        if (partitionStr.length() > 0) {
+            return partitionStr.substring(0, partitionStr.length() - 1);
+        }
+        return "_";
+    }
+
     private List<DataFileMeta> scanExistingFileMetas(
             long snapshotId, BinaryRow partition, int bucket) {
         List<DataFileMeta> existingFileMetas = new ArrayList<>();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
index dc8b74433..eb4353f8c 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
@@ -81,8 +81,9 @@ public class AppendOnlyFileStoreWrite extends 
MemoryFileStoreWrite<InternalRow>
             FileStorePathFactory pathFactory,
             SnapshotManager snapshotManager,
             FileStoreScan scan,
-            CoreOptions options) {
-        super(commitUser, snapshotManager, scan, options, null);
+            CoreOptions options,
+            String tableName) {
+        super(commitUser, snapshotManager, scan, options, null, tableName, 
pathFactory);
         this.fileIO = fileIO;
         this.read = read;
         this.schemaId = schemaId;
@@ -121,8 +122,8 @@ public class AppendOnlyFileStoreWrite extends 
MemoryFileStoreWrite<InternalRow>
                                 compactionMinFileNum,
                                 compactionMaxFileNum,
                                 targetFileSize,
-                                compactRewriter(partition, bucket));
-
+                                compactRewriter(partition, bucket),
+                                getCompactionMetrics(partition, bucket));
         return new AppendOnlyWriter(
                 fileIO,
                 ioManager,
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java
index 89a57fc7a..96fe0e1f5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java
@@ -24,6 +24,7 @@ import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.memory.MemoryPoolFactory;
 import org.apache.paimon.memory.MemorySegmentPool;
+import org.apache.paimon.metrics.MetricRegistry;
 import org.apache.paimon.table.sink.CommitMessage;
 import org.apache.paimon.table.sink.SinkRecord;
 import org.apache.paimon.utils.RecordWriter;
@@ -121,4 +122,7 @@ public interface FileStoreWrite<T> {
      * @throws Exception the thrown exception
      */
     void close() throws Exception;
+
+    /** With metrics to measure compaction. */
+    FileStoreWrite<T> withMetricRegistry(MetricRegistry metricRegistry);
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index 6f4735be3..f6097c183 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -50,6 +50,7 @@ import 
org.apache.paimon.mergetree.compact.MergeFunctionFactory;
 import org.apache.paimon.mergetree.compact.MergeTreeCompactManager;
 import org.apache.paimon.mergetree.compact.MergeTreeCompactRewriter;
 import org.apache.paimon.mergetree.compact.UniversalCompaction;
+import org.apache.paimon.operation.metrics.CompactionMetrics;
 import org.apache.paimon.schema.KeyValueFieldsExtractor;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.types.RowType;
@@ -101,8 +102,9 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
             FileStoreScan scan,
             @Nullable IndexMaintainer.Factory<KeyValue> indexFactory,
             CoreOptions options,
-            KeyValueFieldsExtractor extractor) {
-        super(commitUser, snapshotManager, scan, options, indexFactory);
+            KeyValueFieldsExtractor extractor,
+            String tableName) {
+        super(commitUser, snapshotManager, scan, options, indexFactory, 
tableName, pathFactory);
         this.fileIO = fileIO;
         this.keyType = keyType;
         this.valueType = valueType;
@@ -161,7 +163,13 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
                         ? new LookupCompaction(universalCompaction)
                         : universalCompaction;
         CompactManager compactManager =
-                createCompactManager(partition, bucket, compactStrategy, 
compactExecutor, levels);
+                createCompactManager(
+                        partition,
+                        bucket,
+                        compactStrategy,
+                        compactExecutor,
+                        levels,
+                        getCompactionMetrics(partition, bucket));
         return new MergeTreeWriter(
                 bufferSpillable(),
                 options.localSortMaxNumFileHandles(),
@@ -186,7 +194,8 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
             int bucket,
             CompactStrategy compactStrategy,
             ExecutorService compactExecutor,
-            Levels levels) {
+            Levels levels,
+            @Nullable CompactionMetrics metrics) {
         if (options.writeOnly()) {
             return new NoopCompactManager();
         } else {
@@ -199,7 +208,8 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
                     keyComparator,
                     options.compactionFileSize(),
                     options.numSortedRunStopTrigger(),
-                    rewriter);
+                    rewriter,
+                    metrics);
         }
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java
index 84c00756c..9fbff8efa 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java
@@ -24,6 +24,7 @@ import org.apache.paimon.io.cache.CacheManager;
 import org.apache.paimon.memory.HeapMemorySegmentPool;
 import org.apache.paimon.memory.MemoryOwner;
 import org.apache.paimon.memory.MemoryPoolFactory;
+import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.RecordWriter;
 import org.apache.paimon.utils.SnapshotManager;
 
@@ -57,8 +58,10 @@ public abstract class MemoryFileStoreWrite<T> extends 
AbstractFileStoreWrite<T>
             SnapshotManager snapshotManager,
             FileStoreScan scan,
             CoreOptions options,
-            @Nullable IndexMaintainer.Factory<T> indexFactory) {
-        super(commitUser, snapshotManager, scan, indexFactory);
+            @Nullable IndexMaintainer.Factory<T> indexFactory,
+            String tableName,
+            FileStorePathFactory pathFactory) {
+        super(commitUser, snapshotManager, scan, indexFactory, tableName, 
pathFactory);
         this.options = options;
         this.cacheManager =
                 new CacheManager(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CompactionMetrics.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CompactionMetrics.java
new file mode 100644
index 000000000..699812e57
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CompactionMetrics.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation.metrics;
+
+import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.metrics.Histogram;
+import org.apache.paimon.metrics.MetricGroup;
+import org.apache.paimon.metrics.MetricRegistry;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Metrics to measure a compaction. */
+public class CompactionMetrics {
+    private static final Logger LOG = 
LoggerFactory.getLogger(CompactionMetrics.class);
+    private static final int HISTOGRAM_WINDOW_SIZE = 10_000;
+    private static final String GROUP_NAME = "compaction";
+
+    private final MetricGroup metricGroup;
+
+    public CompactionMetrics(
+            MetricRegistry registry, String tableName, String partition, int 
bucket) {
+        this.metricGroup = registry.bucketMetricGroup(GROUP_NAME, tableName, 
partition, bucket);
+        registerGenericCompactionMetrics();
+    }
+
+    @VisibleForTesting
+    public MetricGroup getMetricGroup() {
+        return metricGroup;
+    }
+
+    private Histogram durationHistogram;
+    private CompactionStats latestCompaction;
+
+    @VisibleForTesting static final String LAST_COMPACTION_DURATION = 
"lastCompactionDuration";
+    @VisibleForTesting static final String COMPACTION_DURATION = 
"compactionDuration";
+
+    @VisibleForTesting
+    static final String LAST_TABLE_FILES_COMPACTED_BEFORE = 
"lastTableFilesCompactedBefore";
+
+    @VisibleForTesting
+    static final String LAST_TABLE_FILES_COMPACTED_AFTER = 
"lastTableFilesCompactedAfter";
+
+    @VisibleForTesting
+    static final String LAST_CHANGELOG_FILES_COMPACTED = 
"lastChangelogFilesCompacted";
+
+    @VisibleForTesting
+    static final String LAST_REWRITE_INPUT_FILE_SIZE = 
"lastRewriteInputFileSize";
+
+    @VisibleForTesting
+    static final String LAST_REWRITE_OUTPUT_FILE_SIZE = 
"lastRewriteOutputFileSize";
+
+    @VisibleForTesting
+    static final String LAST_REWRITE_CHANGELOG_FILE_SIZE = 
"lastRewriteChangelogFileSize";
+
+    private void registerGenericCompactionMetrics() {
+        metricGroup.gauge(
+                LAST_COMPACTION_DURATION,
+                () -> latestCompaction == null ? 0L : 
latestCompaction.getDuration());
+        durationHistogram = metricGroup.histogram(COMPACTION_DURATION, 
HISTOGRAM_WINDOW_SIZE);
+        metricGroup.gauge(
+                LAST_TABLE_FILES_COMPACTED_BEFORE,
+                () ->
+                        latestCompaction == null
+                                ? 0L
+                                : 
latestCompaction.getCompactedDataFilesBefore());
+        metricGroup.gauge(
+                LAST_TABLE_FILES_COMPACTED_AFTER,
+                () ->
+                        latestCompaction == null
+                                ? 0L
+                                : 
latestCompaction.getCompactedDataFilesAfter());
+        metricGroup.gauge(
+                LAST_CHANGELOG_FILES_COMPACTED,
+                () -> latestCompaction == null ? 0L : 
latestCompaction.getCompactedChangelogs());
+        metricGroup.gauge(
+                LAST_REWRITE_INPUT_FILE_SIZE,
+                () -> latestCompaction == null ? 0L : 
latestCompaction.getRewriteInputFileSize());
+        metricGroup.gauge(
+                LAST_REWRITE_OUTPUT_FILE_SIZE,
+                () -> latestCompaction == null ? 0L : 
latestCompaction.getRewriteOutputFileSize());
+        metricGroup.gauge(
+                LAST_REWRITE_CHANGELOG_FILE_SIZE,
+                () ->
+                        latestCompaction == null
+                                ? 0L
+                                : 
latestCompaction.getRewriteChangelogFileSize());
+    }
+
+    public void reportCompaction(CompactionStats compactionStats) {
+        latestCompaction = compactionStats;
+        durationHistogram.update(compactionStats.getDuration());
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CompactionStats.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CompactionStats.java
new file mode 100644
index 000000000..ba71a2dad
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CompactionStats.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation.metrics;
+
+import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.io.DataFileMeta;
+
+import java.util.List;
+
+/** Statistics for a compaction. */
+public class CompactionStats {
+    private final long duration;
+    private final long compactedDataFilesBefore;
+    private final long compactedDataFilesAfter;
+    private final long compactedChangelogs;
+    private final long rewriteInputFileSize;
+    private final long rewriteOutputFileSize;
+    private final long rewriteChangelogFileSize;
+
+    public CompactionStats(
+            long compactionDuration,
+            List<DataFileMeta> compactBefore,
+            List<DataFileMeta> compactAfter,
+            List<DataFileMeta> compactChangelog) {
+        this.duration = compactionDuration;
+        this.compactedDataFilesBefore = compactBefore.size();
+        this.compactedDataFilesAfter = compactAfter.size();
+        this.compactedChangelogs = compactChangelog.size();
+        this.rewriteInputFileSize = rewriteFileSize(compactBefore);
+        this.rewriteOutputFileSize = rewriteFileSize(compactAfter);
+        this.rewriteChangelogFileSize = rewriteFileSize(compactChangelog);
+    }
+
+    @VisibleForTesting
+    protected long getDuration() {
+        return duration;
+    }
+
+    protected long getCompactedDataFilesBefore() {
+        return compactedDataFilesBefore;
+    }
+
+    protected long getCompactedDataFilesAfter() {
+        return compactedDataFilesAfter;
+    }
+
+    protected long getCompactedChangelogs() {
+        return compactedChangelogs;
+    }
+
+    protected long getRewriteInputFileSize() {
+        return rewriteInputFileSize;
+    }
+
+    protected long getRewriteOutputFileSize() {
+        return rewriteOutputFileSize;
+    }
+
+    protected long getRewriteChangelogFileSize() {
+        return rewriteChangelogFileSize;
+    }
+
+    private long rewriteFileSize(List<DataFileMeta> files) {
+        return files.stream().mapToLong(DataFileMeta::fileSize).sum();
+    }
+
+    @Override
+    public String toString() {
+        return "CompactionStats{"
+                + "duration="
+                + duration
+                + ", compactedDataFilesBefore="
+                + compactedDataFilesBefore
+                + ", compactedDataFilesAfter="
+                + compactedDataFilesAfter
+                + ", compactedChangelogs="
+                + compactedChangelogs
+                + ", rewriteInputFileSize="
+                + rewriteInputFileSize
+                + ", rewriteOutputFileSize="
+                + rewriteOutputFileSize
+                + ", rewriteChangelogFileSize="
+                + rewriteChangelogFileSize
+                + '}';
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
index 6f145ae9a..272870cc4 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
@@ -81,7 +81,8 @@ public class AppendOnlyFileStoreTable extends 
AbstractFileStoreTable {
                             new CoreOptions(tableSchema.options()),
                             tableSchema.logicalPartitionType(),
                             tableSchema.logicalBucketKeyType(),
-                            tableSchema.logicalRowType());
+                            tableSchema.logicalRowType(),
+                            name());
         }
         return lazyStore;
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
index 78190fe97..02c6a36c2 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
@@ -106,7 +106,8 @@ public class PrimaryKeyFileStoreTable extends 
AbstractFileStoreTable {
                             new RowType(extractor.keyFields(tableSchema)),
                             rowType,
                             extractor,
-                            mfFactory);
+                            mfFactory,
+                            name());
         }
         return lazyStore;
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWrite.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWrite.java
index 57bfbd190..bf61cc57b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWrite.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWrite.java
@@ -23,6 +23,7 @@ import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.memory.MemorySegmentPool;
+import org.apache.paimon.metrics.MetricRegistry;
 import org.apache.paimon.table.Table;
 
 /**
@@ -58,4 +59,7 @@ public interface TableWrite extends AutoCloseable {
      * changelog.
      */
     void compact(BinaryRow partition, int bucket, boolean fullCompaction) 
throws Exception;
+
+    /** With metrics to measure compaction. */
+    TableWrite withMetricRegistry(MetricRegistry registry);
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
index d3fe25007..f34a38a44 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
@@ -26,6 +26,7 @@ import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.memory.MemoryPoolFactory;
 import org.apache.paimon.memory.MemorySegmentPool;
+import org.apache.paimon.metrics.MetricRegistry;
 import org.apache.paimon.operation.AbstractFileStoreWrite;
 import org.apache.paimon.operation.FileStoreWrite;
 import org.apache.paimon.utils.Restorable;
@@ -146,6 +147,12 @@ public class TableWriteImpl<T>
         write.compact(partition, bucket, fullCompaction);
     }
 
+    @Override
+    public TableWriteImpl<T> withMetricRegistry(MetricRegistry metricRegistry) 
{
+        write.withMetricRegistry(metricRegistry);
+        return this;
+    }
+
     /**
      * Notify that some new files are created at given snapshot in given 
bucket.
      *
diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java 
b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
index 6b3b3f8a0..fbf058f7f 100644
--- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
+++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
@@ -112,7 +112,8 @@ public class TestFileStore extends KeyValueFileStore {
                 keyType,
                 valueType,
                 keyValueFieldsExtractor,
-                mfFactory);
+                mfFactory,
+                (new Path(root)).getName());
         this.root = root;
         this.fileIO = FileIOFinder.find(new Path(root));
         this.keySerializer = new InternalRowSerializer(keyType);
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyCompactManagerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyCompactManagerTest.java
index dc4d250c8..68bc4db9c 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyCompactManagerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyCompactManagerTest.java
@@ -206,8 +206,8 @@ public class AppendOnlyCompactManagerTest {
                         minFileNum,
                         maxFileNum,
                         targetFileSize,
-                        null // not used
-                        );
+                        null, // not used
+                        null);
         Optional<List<DataFileMeta>> actual = manager.pickCompactBefore();
         assertThat(actual.isPresent()).isEqualTo(expectedPresent);
         if (expectedPresent) {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java 
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
index 62a0292a8..4eefdfef6 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
@@ -527,7 +527,8 @@ public class AppendOnlyWriterTest {
                                     ? Collections.emptyList()
                                     : Collections.singletonList(
                                             
generateCompactAfter(compactBefore));
-                        });
+                        },
+                        null);
         CoreOptions options = new CoreOptions(new HashMap<>());
         AppendOnlyWriter writer =
                 new AppendOnlyWriter(
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/FullCompactTaskTest.java 
b/paimon-core/src/test/java/org/apache/paimon/append/FullCompactTaskTest.java
index aab1f1ad8..5e474ccd4 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/append/FullCompactTaskTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/append/FullCompactTaskTest.java
@@ -123,7 +123,7 @@ public class FullCompactTaskTest {
                 Collection<DataFileMeta> inputs,
                 long targetFileSize,
                 AppendOnlyCompactManager.CompactRewriter rewriter) {
-            super(inputs, targetFileSize, rewriter);
+            super(inputs, targetFileSize, rewriter, null);
         }
     }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java 
b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
index 3404214d1..c8f5ebd00 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
@@ -76,7 +76,8 @@ public class FileFormatSuffixTest extends 
KeyValueFileReadWriteTest {
                         10,
                         SCHEMA,
                         0,
-                        new AppendOnlyCompactManager(null, toCompact, 4, 10, 
10, null), // not used
+                        new AppendOnlyCompactManager(
+                                null, toCompact, 4, 10, 10, null, null), // 
not used
                         false,
                         dataFilePathFactory,
                         null,
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
index 718d8714a..4e544b0e6 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
@@ -444,7 +444,8 @@ public abstract class MergeTreeTestBase {
                 comparator,
                 options.compactionFileSize(),
                 options.numSortedRunStopTrigger(),
-                new TestRewriter());
+                new TestRewriter(),
+                null);
     }
 
     static class MockFailResultCompactionManager extends 
MergeTreeCompactManager {
@@ -463,7 +464,8 @@ public abstract class MergeTreeTestBase {
                     keyComparator,
                     minFileSize,
                     numSortedRunStopTrigger,
-                    rewriter);
+                    rewriter,
+                    null);
         }
 
         protected CompactResult obtainCompactResult()
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerTest.java
index 4de33bfe0..338013d32 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerTest.java
@@ -204,7 +204,8 @@ public class MergeTreeCompactManagerTest {
                         comparator,
                         2,
                         Integer.MAX_VALUE,
-                        new TestRewriter(expectedDropDelete));
+                        new TestRewriter(expectedDropDelete),
+                        null);
         manager.triggerCompaction(false);
         manager.getCompactionResult(true);
         List<LevelMinMax> outputs =
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/metrics/CompactionMetricsTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/metrics/CompactionMetricsTest.java
new file mode 100644
index 000000000..eb180d3c1
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/metrics/CompactionMetricsTest.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation.metrics;
+
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataFileTestUtils;
+import org.apache.paimon.metrics.Gauge;
+import org.apache.paimon.metrics.Histogram;
+import org.apache.paimon.metrics.Metric;
+import org.apache.paimon.metrics.MetricRegistryImpl;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.offset;
+
+/** Tests for {@link CompactionMetrics}. */
+public class CompactionMetricsTest {
+
+    private static final String TABLE_NAME = "myTable";
+    private static final String PARTITION = "date=20230623";
+    private static final int BUCKET = 5;
+
+    /** Tests that the metrics are updated properly. */
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testMetricsAreUpdated() {
+        CompactionMetrics compactionMetrics = getCompactionMetrics();
+        Map<String, Metric> registeredGenericMetrics =
+                compactionMetrics.getMetricGroup().getMetrics();
+
+        // Check initial values
+        Gauge<Long> lastCompactionDuration =
+                (Gauge<Long>)
+                        
registeredGenericMetrics.get(CompactionMetrics.LAST_COMPACTION_DURATION);
+        Histogram compactionDuration =
+                (Histogram) 
registeredGenericMetrics.get(CompactionMetrics.COMPACTION_DURATION);
+        Gauge<Long> lastTableFilesCompactedBefore =
+                (Gauge<Long>)
+                        registeredGenericMetrics.get(
+                                
CompactionMetrics.LAST_TABLE_FILES_COMPACTED_BEFORE);
+        Gauge<Long> lastTableFilesCompactedAfter =
+                (Gauge<Long>)
+                        registeredGenericMetrics.get(
+                                
CompactionMetrics.LAST_TABLE_FILES_COMPACTED_AFTER);
+        Gauge<Long> lastChangelogFilesCompacted =
+                (Gauge<Long>)
+                        registeredGenericMetrics.get(
+                                
CompactionMetrics.LAST_CHANGELOG_FILES_COMPACTED);
+        Gauge<Long> lastRewriteInputFileSize =
+                (Gauge<Long>)
+                        registeredGenericMetrics.get(
+                                
CompactionMetrics.LAST_REWRITE_INPUT_FILE_SIZE);
+        Gauge<Long> lastRewriteOutputFileSize =
+                (Gauge<Long>)
+                        registeredGenericMetrics.get(
+                                
CompactionMetrics.LAST_REWRITE_OUTPUT_FILE_SIZE);
+        Gauge<Long> lastRewriteChangelogFileSize =
+                (Gauge<Long>)
+                        registeredGenericMetrics.get(
+                                
CompactionMetrics.LAST_REWRITE_CHANGELOG_FILE_SIZE);
+
+        assertThat(lastCompactionDuration.getValue()).isEqualTo(0);
+        assertThat(compactionDuration.getCount()).isEqualTo(0);
+        assertThat(compactionDuration.getStatistics().size()).isEqualTo(0);
+        assertThat(lastTableFilesCompactedBefore.getValue()).isEqualTo(0);
+        assertThat(lastTableFilesCompactedAfter.getValue()).isEqualTo(0);
+        assertThat(lastChangelogFilesCompacted.getValue()).isEqualTo(0);
+        assertThat(lastRewriteInputFileSize.getValue()).isEqualTo(0);
+        assertThat(lastRewriteOutputFileSize.getValue()).isEqualTo(0);
+        assertThat(lastRewriteChangelogFileSize.getValue()).isEqualTo(0);
+
+        // report once
+        reportOnce(compactionMetrics);
+
+        // generic metrics value updated
+        assertThat(lastCompactionDuration.getValue()).isEqualTo(3000);
+        assertThat(compactionDuration.getCount()).isEqualTo(1);
+        assertThat(compactionDuration.getStatistics().size()).isEqualTo(1);
+        
assertThat(compactionDuration.getStatistics().getValues()[0]).isEqualTo(3000L);
+        
assertThat(compactionDuration.getStatistics().getMin()).isEqualTo(3000);
+        assertThat(compactionDuration.getStatistics().getQuantile(0.5))
+                .isCloseTo(3000.0, offset(0.001));
+        
assertThat(compactionDuration.getStatistics().getMean()).isEqualTo(3000);
+        
assertThat(compactionDuration.getStatistics().getMax()).isEqualTo(3000);
+        
assertThat(compactionDuration.getStatistics().getStdDev()).isEqualTo(0);
+        assertThat(lastTableFilesCompactedBefore.getValue()).isEqualTo(2);
+        assertThat(lastTableFilesCompactedAfter.getValue()).isEqualTo(2);
+        assertThat(lastChangelogFilesCompacted.getValue()).isEqualTo(2);
+        assertThat(lastRewriteInputFileSize.getValue()).isEqualTo(2001);
+        assertThat(lastRewriteOutputFileSize.getValue()).isEqualTo(1101);
+        assertThat(lastRewriteChangelogFileSize.getValue()).isEqualTo(3001);
+
+        // report again
+        reportAgain(compactionMetrics);
+
+        // generic metrics value updated
+        assertThat(lastCompactionDuration.getValue()).isEqualTo(6000);
+        assertThat(compactionDuration.getCount()).isEqualTo(2);
+        assertThat(compactionDuration.getStatistics().size()).isEqualTo(2);
+        
assertThat(compactionDuration.getStatistics().getValues()[1]).isEqualTo(6000L);
+        
assertThat(compactionDuration.getStatistics().getMin()).isEqualTo(3000);
+        assertThat(compactionDuration.getStatistics().getQuantile(0.5))
+                .isCloseTo(4500, offset(0.001));
+        
assertThat(compactionDuration.getStatistics().getMean()).isEqualTo(4500);
+        
assertThat(compactionDuration.getStatistics().getMax()).isEqualTo(6000);
+        assertThat(compactionDuration.getStatistics().getStdDev())
+                .isCloseTo(2121.320, offset(0.001));
+        assertThat(lastTableFilesCompactedBefore.getValue()).isEqualTo(2);
+        assertThat(lastTableFilesCompactedAfter.getValue()).isEqualTo(2);
+        assertThat(lastChangelogFilesCompacted.getValue()).isEqualTo(2);
+        assertThat(lastRewriteInputFileSize.getValue()).isEqualTo(2001);
+        assertThat(lastRewriteOutputFileSize.getValue()).isEqualTo(1201);
+        assertThat(lastRewriteChangelogFileSize.getValue()).isEqualTo(2501);
+    }
+
+    private void reportOnce(CompactionMetrics compactionMetrics) {
+        List<DataFileMeta> compactBefore = new ArrayList<>();
+        List<DataFileMeta> compactAfter = new ArrayList<>();
+        List<DataFileMeta> compactChangelog = new ArrayList<>();
+        compactBefore.add(DataFileTestUtils.newFile(0, 1000));
+        compactBefore.add(DataFileTestUtils.newFile(1001, 2000));
+        compactAfter.add(DataFileTestUtils.newFile(400, 1000));
+        compactAfter.add(DataFileTestUtils.newFile(1001, 1500));
+        compactChangelog.add(DataFileTestUtils.newFile(0, 2000));
+        compactChangelog.add(DataFileTestUtils.newFile(2001, 3000));
+
+        CompactionStats compactionStats =
+                new CompactionStats(3000, compactBefore, compactAfter, 
compactChangelog);
+
+        compactionMetrics.reportCompaction(compactionStats);
+    }
+
+    private void reportAgain(CompactionMetrics compactionMetrics) {
+        List<DataFileMeta> compactBefore = new ArrayList<>();
+        List<DataFileMeta> compactAfter = new ArrayList<>();
+        List<DataFileMeta> compactChangelog = new ArrayList<>();
+        compactBefore.add(DataFileTestUtils.newFile(2000, 3000));
+        compactBefore.add(DataFileTestUtils.newFile(3001, 4000));
+        compactAfter.add(DataFileTestUtils.newFile(600, 1200));
+        compactAfter.add(DataFileTestUtils.newFile(1201, 1800));
+        compactChangelog.add(DataFileTestUtils.newFile(0, 1500));
+        compactChangelog.add(DataFileTestUtils.newFile(1501, 2500));
+
+        CompactionStats compactionStats =
+                new CompactionStats(6000, compactBefore, compactAfter, 
compactChangelog);
+
+        compactionMetrics.reportCompaction(compactionStats);
+    }
+
+    private CompactionMetrics getCompactionMetrics() {
+        return new CompactionMetrics(new MetricRegistryImpl(), TABLE_NAME, 
PARTITION, BUCKET);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
index 92527df54..0f9d1cd30 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
@@ -189,7 +189,8 @@ public class TestChangelogDataReadWrite {
                                 null, // not used, we only create an empty 
writer
                                 null,
                                 options,
-                                EXTRACTOR)
+                                EXTRACTOR,
+                                tablePath.getName())
                         .createWriterContainer(partition, bucket, true)
                         .writer;
         ((MemoryOwner) writer)

Reply via email to