This is an automated email from the ASF dual-hosted git repository.
czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new f09f63fb3 [core] Support compaction metrics (#2224)
f09f63fb3 is described below
commit f09f63fb3f2b3243d38670fc17f22c3cfc2f59de
Author: GuojunLi <[email protected]>
AuthorDate: Wed Nov 8 17:19:41 2023 +0800
[core] Support compaction metrics (#2224)
This closes #2224.
---
.../org/apache/paimon/AppendOnlyFileStore.java | 8 +-
.../java/org/apache/paimon/KeyValueFileStore.java | 8 +-
.../paimon/append/AppendOnlyCompactManager.java | 26 ++-
.../paimon/append/AppendOnlyCompactionTask.java | 32 ++--
.../org/apache/paimon/compact/CompactTask.java | 39 ++++-
.../mergetree/compact/MergeTreeCompactManager.java | 11 +-
.../mergetree/compact/MergeTreeCompactTask.java | 7 +-
.../paimon/operation/AbstractFileStoreWrite.java | 37 ++++-
.../paimon/operation/AppendOnlyFileStoreWrite.java | 9 +-
.../apache/paimon/operation/FileStoreWrite.java | 4 +
.../paimon/operation/KeyValueFileStoreWrite.java | 20 ++-
.../paimon/operation/MemoryFileStoreWrite.java | 7 +-
.../operation/metrics/CompactionMetrics.java | 110 +++++++++++++
.../paimon/operation/metrics/CompactionStats.java | 102 ++++++++++++
.../paimon/table/AppendOnlyFileStoreTable.java | 3 +-
.../paimon/table/PrimaryKeyFileStoreTable.java | 3 +-
.../org/apache/paimon/table/sink/TableWrite.java | 4 +
.../apache/paimon/table/sink/TableWriteImpl.java | 7 +
.../test/java/org/apache/paimon/TestFileStore.java | 3 +-
.../append/AppendOnlyCompactManagerTest.java | 4 +-
.../apache/paimon/append/AppendOnlyWriterTest.java | 3 +-
.../apache/paimon/append/FullCompactTaskTest.java | 2 +-
.../apache/paimon/format/FileFormatSuffixTest.java | 3 +-
.../apache/paimon/mergetree/MergeTreeTestBase.java | 6 +-
.../compact/MergeTreeCompactManagerTest.java | 3 +-
.../operation/metrics/CompactionMetricsTest.java | 174 +++++++++++++++++++++
.../flink/source/TestChangelogDataReadWrite.java | 3 +-
27 files changed, 588 insertions(+), 50 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
index f5fe41617..a36d02bd8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
@@ -43,6 +43,7 @@ public class AppendOnlyFileStore extends
AbstractFileStore<InternalRow> {
private final RowType bucketKeyType;
private final RowType rowType;
+ private final String tableName;
public AppendOnlyFileStore(
FileIO fileIO,
@@ -51,10 +52,12 @@ public class AppendOnlyFileStore extends
AbstractFileStore<InternalRow> {
CoreOptions options,
RowType partitionType,
RowType bucketKeyType,
- RowType rowType) {
+ RowType rowType,
+ String tableName) {
super(fileIO, schemaManager, schemaId, options, partitionType);
this.bucketKeyType = bucketKeyType;
this.rowType = rowType;
+ this.tableName = tableName;
}
@Override
@@ -95,7 +98,8 @@ public class AppendOnlyFileStore extends
AbstractFileStore<InternalRow> {
pathFactory(),
snapshotManager(),
newScan(true).withManifestCacheFilter(manifestFilter),
- options);
+ options,
+ tableName);
}
private AppendOnlyFileStoreScan newScan(boolean forWrite) {
diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
index ef122ee78..98b88323f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
@@ -65,6 +65,7 @@ public class KeyValueFileStore extends
AbstractFileStore<KeyValue> {
private final Supplier<Comparator<InternalRow>> keyComparatorSupplier;
private final Supplier<RecordEqualiser> valueEqualiserSupplier;
private final MergeFunctionFactory<KeyValue> mfFactory;
+ private final String tableName;
public KeyValueFileStore(
FileIO fileIO,
@@ -77,7 +78,8 @@ public class KeyValueFileStore extends
AbstractFileStore<KeyValue> {
RowType keyType,
RowType valueType,
KeyValueFieldsExtractor keyValueFieldsExtractor,
- MergeFunctionFactory<KeyValue> mfFactory) {
+ MergeFunctionFactory<KeyValue> mfFactory,
+ String tableName) {
super(fileIO, schemaManager, schemaId, options, partitionType);
this.crossPartitionUpdate = crossPartitionUpdate;
this.bucketKeyType = bucketKeyType;
@@ -87,6 +89,7 @@ public class KeyValueFileStore extends
AbstractFileStore<KeyValue> {
this.mfFactory = mfFactory;
this.keyComparatorSupplier = new KeyComparatorSupplier(keyType);
this.valueEqualiserSupplier = new ValueEqualiserSupplier(valueType);
+ this.tableName = tableName;
}
@Override
@@ -147,7 +150,8 @@ public class KeyValueFileStore extends
AbstractFileStore<KeyValue> {
newScan(true).withManifestCacheFilter(manifestFilter),
indexFactory,
options,
- keyValueFieldsExtractor);
+ keyValueFieldsExtractor,
+ tableName);
}
private Map<String, FileStorePathFactory> format2PathFactory() {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyCompactManager.java
b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyCompactManager.java
index dd934ff20..b3569a0fb 100644
---
a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyCompactManager.java
+++
b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyCompactManager.java
@@ -24,11 +24,14 @@ import org.apache.paimon.compact.CompactFutureManager;
import org.apache.paimon.compact.CompactResult;
import org.apache.paimon.compact.CompactTask;
import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.operation.metrics.CompactionMetrics;
import org.apache.paimon.utils.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
@@ -56,13 +59,16 @@ public class AppendOnlyCompactManager extends
CompactFutureManager {
private List<DataFileMeta> compacting;
+ @Nullable private final CompactionMetrics metrics;
+
public AppendOnlyCompactManager(
ExecutorService executor,
List<DataFileMeta> restored,
int minFileNum,
int maxFileNum,
long targetFileSize,
- CompactRewriter rewriter) {
+ CompactRewriter rewriter,
+ @Nullable CompactionMetrics metrics) {
this.executor = executor;
this.toCompact = new TreeSet<>(fileComparator());
this.toCompact.addAll(restored);
@@ -70,6 +76,7 @@ public class AppendOnlyCompactManager extends
CompactFutureManager {
this.maxFileNum = maxFileNum;
this.targetFileSize = targetFileSize;
this.rewriter = rewriter;
+ this.metrics = metrics;
}
@Override
@@ -90,7 +97,8 @@ public class AppendOnlyCompactManager extends
CompactFutureManager {
return;
}
- taskFuture = executor.submit(new FullCompactTask(toCompact,
targetFileSize, rewriter));
+ taskFuture =
+ executor.submit(new FullCompactTask(toCompact, targetFileSize,
rewriter, metrics));
compacting = new ArrayList<>(toCompact);
toCompact.clear();
}
@@ -102,7 +110,7 @@ public class AppendOnlyCompactManager extends
CompactFutureManager {
Optional<List<DataFileMeta>> picked = pickCompactBefore();
if (picked.isPresent()) {
compacting = picked.get();
- taskFuture = executor.submit(new AutoCompactTask(compacting,
rewriter));
+ taskFuture = executor.submit(new AutoCompactTask(compacting,
rewriter, metrics));
}
}
@@ -197,7 +205,11 @@ public class AppendOnlyCompactManager extends
CompactFutureManager {
private final CompactRewriter rewriter;
public FullCompactTask(
- Collection<DataFileMeta> inputs, long targetFileSize,
CompactRewriter rewriter) {
+ Collection<DataFileMeta> inputs,
+ long targetFileSize,
+ CompactRewriter rewriter,
+ @Nullable CompactionMetrics metrics) {
+ super(metrics);
this.inputs = new LinkedList<>(inputs);
this.targetFileSize = targetFileSize;
this.rewriter = rewriter;
@@ -249,7 +261,11 @@ public class AppendOnlyCompactManager extends
CompactFutureManager {
private final List<DataFileMeta> toCompact;
private final CompactRewriter rewriter;
- public AutoCompactTask(List<DataFileMeta> toCompact, CompactRewriter
rewriter) {
+ public AutoCompactTask(
+ List<DataFileMeta> toCompact,
+ CompactRewriter rewriter,
+ @Nullable CompactionMetrics metrics) {
+ super(metrics);
this.toCompact = toCompact;
this.rewriter = rewriter;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyCompactionTask.java
b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyCompactionTask.java
index 3ef3da40e..0f9191471 100644
---
a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyCompactionTask.java
+++
b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyCompactionTask.java
@@ -23,6 +23,8 @@ import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.NewFilesIncrement;
import org.apache.paimon.operation.AppendOnlyFileStoreWrite;
+import org.apache.paimon.operation.metrics.CompactionMetrics;
+import org.apache.paimon.operation.metrics.CompactionStats;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.utils.Preconditions;
@@ -61,15 +63,27 @@ public class AppendOnlyCompactionTask {
}
public CommitMessage doCompact(AppendOnlyFileStoreWrite write) throws
Exception {
- compactAfter.addAll(write.compactRewriter(partition,
0).rewrite(compactBefore));
- CompactIncrement compactIncrement =
- new CompactIncrement(compactBefore, compactAfter,
Collections.emptyList());
- return new CommitMessageImpl(
- partition,
- 0, // bucket 0 is bucket for unaware-bucket table for
compatibility with the old
- // design
- NewFilesIncrement.emptyIncrement(),
- compactIncrement);
+ CompactionMetrics metrics = write.getCompactionMetrics(partition, 0);
+ long startMillis = System.currentTimeMillis();
+ try {
+ compactAfter.addAll(write.compactRewriter(partition,
0).rewrite(compactBefore));
+ CompactIncrement compactIncrement =
+ new CompactIncrement(compactBefore, compactAfter,
Collections.emptyList());
+ return new CommitMessageImpl(
+ partition,
+ 0, // bucket 0 is bucket for unaware-bucket table for
compatibility with the old
+ // design
+ NewFilesIncrement.emptyIncrement(),
+ compactIncrement);
+ } finally {
+ if (metrics != null) {
+ long duration = System.currentTimeMillis() - startMillis;
+ CompactionStats compactionStats =
+ new CompactionStats(
+ duration, compactBefore, compactAfter,
Collections.emptyList());
+ metrics.reportCompaction(compactionStats);
+ }
+ }
}
public int hashCode() {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/compact/CompactTask.java
b/paimon-core/src/main/java/org/apache/paimon/compact/CompactTask.java
index a1988b7c9..dad4aa374 100644
--- a/paimon-core/src/main/java/org/apache/paimon/compact/CompactTask.java
+++ b/paimon-core/src/main/java/org/apache/paimon/compact/CompactTask.java
@@ -19,10 +19,15 @@
package org.apache.paimon.compact;
import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.operation.metrics.CompactionMetrics;
+import org.apache.paimon.operation.metrics.CompactionStats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
@@ -30,17 +35,41 @@ import java.util.concurrent.Callable;
public abstract class CompactTask implements Callable<CompactResult> {
private static final Logger LOG =
LoggerFactory.getLogger(CompactTask.class);
+ @Nullable private final CompactionMetrics metrics;
+
+ public CompactTask(@Nullable CompactionMetrics metrics) {
+ this.metrics = metrics;
+ }
@Override
public CompactResult call() throws Exception {
long startMillis = System.currentTimeMillis();
- CompactResult result = doCompact();
+ CompactResult result = null;
+ try {
+ result = doCompact();
- if (LOG.isDebugEnabled()) {
- logMetric(startMillis, result.before(), result.after());
+ if (LOG.isDebugEnabled()) {
+ logMetric(startMillis, result.before(), result.after());
+ }
+ return result;
+ } finally {
+ if (metrics != null) {
+ long duration = System.currentTimeMillis() - startMillis;
+ CompactionStats compactionStats =
+ result == null
+ ? new CompactionStats(
+ duration,
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyList())
+ : new CompactionStats(
+ duration,
+ result.before(),
+ result.after(),
+ result.changelog());
+ metrics.reportCompaction(compactionStats);
+ }
}
-
- return result;
}
protected String logMetric(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
index 300bfe84b..49935f2f6 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
@@ -27,11 +27,14 @@ import org.apache.paimon.data.InternalRow;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.mergetree.LevelSortedRun;
import org.apache.paimon.mergetree.Levels;
+import org.apache.paimon.operation.metrics.CompactionMetrics;
import org.apache.paimon.utils.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.util.Comparator;
import java.util.List;
@@ -53,6 +56,8 @@ public class MergeTreeCompactManager extends
CompactFutureManager {
private final int numSortedRunStopTrigger;
private final CompactRewriter rewriter;
+ @Nullable private final CompactionMetrics metrics;
+
public MergeTreeCompactManager(
ExecutorService executor,
Levels levels,
@@ -60,7 +65,8 @@ public class MergeTreeCompactManager extends
CompactFutureManager {
Comparator<InternalRow> keyComparator,
long compactionFileSize,
int numSortedRunStopTrigger,
- CompactRewriter rewriter) {
+ CompactRewriter rewriter,
+ @Nullable CompactionMetrics metrics) {
this.executor = executor;
this.levels = levels;
this.strategy = strategy;
@@ -68,6 +74,7 @@ public class MergeTreeCompactManager extends
CompactFutureManager {
this.numSortedRunStopTrigger = numSortedRunStopTrigger;
this.keyComparator = keyComparator;
this.rewriter = rewriter;
+ this.metrics = metrics;
}
@Override
@@ -162,7 +169,7 @@ public class MergeTreeCompactManager extends
CompactFutureManager {
private void submitCompaction(CompactUnit unit, boolean dropDelete) {
MergeTreeCompactTask task =
new MergeTreeCompactTask(
- keyComparator, compactionFileSize, rewriter, unit,
dropDelete);
+ keyComparator, compactionFileSize, rewriter, unit,
dropDelete, metrics);
if (LOG.isDebugEnabled()) {
LOG.debug(
"Pick these files (name, level, size) for compaction: {}",
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactTask.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactTask.java
index 150881f10..4fc11b076 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactTask.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactTask.java
@@ -24,6 +24,9 @@ import org.apache.paimon.compact.CompactUnit;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.mergetree.SortedRun;
+import org.apache.paimon.operation.metrics.CompactionMetrics;
+
+import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Comparator;
@@ -50,7 +53,9 @@ public class MergeTreeCompactTask extends CompactTask {
long minFileSize,
CompactRewriter rewriter,
CompactUnit unit,
- boolean dropDelete) {
+ boolean dropDelete,
+ @Nullable CompactionMetrics metrics) {
+ super(metrics);
this.minFileSize = minFileSize;
this.rewriter = rewriter;
this.outputLevel = unit.outputLevel();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
index 125756a01..a5f64562b 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
@@ -22,16 +22,20 @@ import org.apache.paimon.Snapshot;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.fs.Path;
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.index.IndexMaintainer;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.IndexIncrement;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.memory.MemoryPoolFactory;
+import org.apache.paimon.metrics.MetricRegistry;
+import org.apache.paimon.operation.metrics.CompactionMetrics;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.utils.CommitIncrement;
import org.apache.paimon.utils.ExecutorThreadFactory;
+import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.RecordWriter;
import org.apache.paimon.utils.Restorable;
import org.apache.paimon.utils.SnapshotManager;
@@ -73,18 +77,25 @@ public abstract class AbstractFileStoreWrite<T>
private boolean closeCompactExecutorWhenLeaving = true;
private boolean ignorePreviousFiles = false;
protected boolean isStreamingMode = false;
+ private MetricRegistry metricRegistry = null;
+ private final String tableName;
+ private final FileStorePathFactory pathFactory;
protected AbstractFileStoreWrite(
String commitUser,
SnapshotManager snapshotManager,
FileStoreScan scan,
- @Nullable IndexMaintainer.Factory<T> indexFactory) {
+ @Nullable IndexMaintainer.Factory<T> indexFactory,
+ String tableName,
+ FileStorePathFactory pathFactory) {
this.commitUser = commitUser;
this.snapshotManager = snapshotManager;
this.scan = scan;
this.indexFactory = indexFactory;
this.writers = new HashMap<>();
+ this.tableName = tableName;
+ this.pathFactory = pathFactory;
}
@Override
@@ -343,6 +354,30 @@ public abstract class AbstractFileStoreWrite<T>
this.isStreamingMode = isStreamingMode;
}
+ @Override
+ public FileStoreWrite<T> withMetricRegistry(MetricRegistry metricRegistry)
{
+ this.metricRegistry = metricRegistry;
+ return this;
+ }
+
+ @Nullable
+ public CompactionMetrics getCompactionMetrics(BinaryRow partition, int
bucket) {
+ if (metricRegistry != null) {
+ return new CompactionMetrics(
+ metricRegistry, tableName, getPartitionString(pathFactory,
partition), bucket);
+ }
+ return null;
+ }
+
+ private String getPartitionString(FileStorePathFactory pathFactory,
BinaryRow partition) {
+ String partitionStr =
+
pathFactory.getPartitionString(partition).replace(Path.SEPARATOR, "_");
+ if (partitionStr.length() > 0) {
+ return partitionStr.substring(0, partitionStr.length() - 1);
+ }
+ return "_";
+ }
+
private List<DataFileMeta> scanExistingFileMetas(
long snapshotId, BinaryRow partition, int bucket) {
List<DataFileMeta> existingFileMetas = new ArrayList<>();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
index dc8b74433..eb4353f8c 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
@@ -81,8 +81,9 @@ public class AppendOnlyFileStoreWrite extends
MemoryFileStoreWrite<InternalRow>
FileStorePathFactory pathFactory,
SnapshotManager snapshotManager,
FileStoreScan scan,
- CoreOptions options) {
- super(commitUser, snapshotManager, scan, options, null);
+ CoreOptions options,
+ String tableName) {
+ super(commitUser, snapshotManager, scan, options, null, tableName,
pathFactory);
this.fileIO = fileIO;
this.read = read;
this.schemaId = schemaId;
@@ -121,8 +122,8 @@ public class AppendOnlyFileStoreWrite extends
MemoryFileStoreWrite<InternalRow>
compactionMinFileNum,
compactionMaxFileNum,
targetFileSize,
- compactRewriter(partition, bucket));
-
+ compactRewriter(partition, bucket),
+ getCompactionMetrics(partition, bucket));
return new AppendOnlyWriter(
fileIO,
ioManager,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java
index 89a57fc7a..96fe0e1f5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java
@@ -24,6 +24,7 @@ import org.apache.paimon.disk.IOManager;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.memory.MemoryPoolFactory;
import org.apache.paimon.memory.MemorySegmentPool;
+import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.SinkRecord;
import org.apache.paimon.utils.RecordWriter;
@@ -121,4 +122,7 @@ public interface FileStoreWrite<T> {
* @throws Exception the thrown exception
*/
void close() throws Exception;
+
+ /** With metrics to measure compaction. */
+ FileStoreWrite<T> withMetricRegistry(MetricRegistry metricRegistry);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index 6f4735be3..f6097c183 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -50,6 +50,7 @@ import
org.apache.paimon.mergetree.compact.MergeFunctionFactory;
import org.apache.paimon.mergetree.compact.MergeTreeCompactManager;
import org.apache.paimon.mergetree.compact.MergeTreeCompactRewriter;
import org.apache.paimon.mergetree.compact.UniversalCompaction;
+import org.apache.paimon.operation.metrics.CompactionMetrics;
import org.apache.paimon.schema.KeyValueFieldsExtractor;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.types.RowType;
@@ -101,8 +102,9 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
FileStoreScan scan,
@Nullable IndexMaintainer.Factory<KeyValue> indexFactory,
CoreOptions options,
- KeyValueFieldsExtractor extractor) {
- super(commitUser, snapshotManager, scan, options, indexFactory);
+ KeyValueFieldsExtractor extractor,
+ String tableName) {
+ super(commitUser, snapshotManager, scan, options, indexFactory,
tableName, pathFactory);
this.fileIO = fileIO;
this.keyType = keyType;
this.valueType = valueType;
@@ -161,7 +163,13 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
? new LookupCompaction(universalCompaction)
: universalCompaction;
CompactManager compactManager =
- createCompactManager(partition, bucket, compactStrategy,
compactExecutor, levels);
+ createCompactManager(
+ partition,
+ bucket,
+ compactStrategy,
+ compactExecutor,
+ levels,
+ getCompactionMetrics(partition, bucket));
return new MergeTreeWriter(
bufferSpillable(),
options.localSortMaxNumFileHandles(),
@@ -186,7 +194,8 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
int bucket,
CompactStrategy compactStrategy,
ExecutorService compactExecutor,
- Levels levels) {
+ Levels levels,
+ @Nullable CompactionMetrics metrics) {
if (options.writeOnly()) {
return new NoopCompactManager();
} else {
@@ -199,7 +208,8 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
keyComparator,
options.compactionFileSize(),
options.numSortedRunStopTrigger(),
- rewriter);
+ rewriter,
+ metrics);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java
index 84c00756c..9fbff8efa 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java
@@ -24,6 +24,7 @@ import org.apache.paimon.io.cache.CacheManager;
import org.apache.paimon.memory.HeapMemorySegmentPool;
import org.apache.paimon.memory.MemoryOwner;
import org.apache.paimon.memory.MemoryPoolFactory;
+import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.RecordWriter;
import org.apache.paimon.utils.SnapshotManager;
@@ -57,8 +58,10 @@ public abstract class MemoryFileStoreWrite<T> extends
AbstractFileStoreWrite<T>
SnapshotManager snapshotManager,
FileStoreScan scan,
CoreOptions options,
- @Nullable IndexMaintainer.Factory<T> indexFactory) {
- super(commitUser, snapshotManager, scan, indexFactory);
+ @Nullable IndexMaintainer.Factory<T> indexFactory,
+ String tableName,
+ FileStorePathFactory pathFactory) {
+ super(commitUser, snapshotManager, scan, indexFactory, tableName,
pathFactory);
this.options = options;
this.cacheManager =
new CacheManager(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CompactionMetrics.java
b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CompactionMetrics.java
new file mode 100644
index 000000000..699812e57
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CompactionMetrics.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation.metrics;
+
+import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.metrics.Histogram;
+import org.apache.paimon.metrics.MetricGroup;
+import org.apache.paimon.metrics.MetricRegistry;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Metrics to measure a compaction. */
+public class CompactionMetrics {
+ private static final Logger LOG =
LoggerFactory.getLogger(CompactionMetrics.class);
+ private static final int HISTOGRAM_WINDOW_SIZE = 10_000;
+ private static final String GROUP_NAME = "compaction";
+
+ private final MetricGroup metricGroup;
+
+ public CompactionMetrics(
+ MetricRegistry registry, String tableName, String partition, int
bucket) {
+ this.metricGroup = registry.bucketMetricGroup(GROUP_NAME, tableName,
partition, bucket);
+ registerGenericCompactionMetrics();
+ }
+
+ @VisibleForTesting
+ public MetricGroup getMetricGroup() {
+ return metricGroup;
+ }
+
+ private Histogram durationHistogram;
+ private CompactionStats latestCompaction;
+
+ @VisibleForTesting static final String LAST_COMPACTION_DURATION =
"lastCompactionDuration";
+ @VisibleForTesting static final String COMPACTION_DURATION =
"compactionDuration";
+
+ @VisibleForTesting
+ static final String LAST_TABLE_FILES_COMPACTED_BEFORE =
"lastTableFilesCompactedBefore";
+
+ @VisibleForTesting
+ static final String LAST_TABLE_FILES_COMPACTED_AFTER =
"lastTableFilesCompactedAfter";
+
+ @VisibleForTesting
+ static final String LAST_CHANGELOG_FILES_COMPACTED =
"lastChangelogFilesCompacted";
+
+ @VisibleForTesting
+ static final String LAST_REWRITE_INPUT_FILE_SIZE =
"lastRewriteInputFileSize";
+
+ @VisibleForTesting
+ static final String LAST_REWRITE_OUTPUT_FILE_SIZE =
"lastRewriteOutputFileSize";
+
+ @VisibleForTesting
+ static final String LAST_REWRITE_CHANGELOG_FILE_SIZE =
"lastRewriteChangelogFileSize";
+
+ private void registerGenericCompactionMetrics() {
+ metricGroup.gauge(
+ LAST_COMPACTION_DURATION,
+ () -> latestCompaction == null ? 0L :
latestCompaction.getDuration());
+ durationHistogram = metricGroup.histogram(COMPACTION_DURATION,
HISTOGRAM_WINDOW_SIZE);
+ metricGroup.gauge(
+ LAST_TABLE_FILES_COMPACTED_BEFORE,
+ () ->
+ latestCompaction == null
+ ? 0L
+ :
latestCompaction.getCompactedDataFilesBefore());
+ metricGroup.gauge(
+ LAST_TABLE_FILES_COMPACTED_AFTER,
+ () ->
+ latestCompaction == null
+ ? 0L
+ :
latestCompaction.getCompactedDataFilesAfter());
+ metricGroup.gauge(
+ LAST_CHANGELOG_FILES_COMPACTED,
+ () -> latestCompaction == null ? 0L :
latestCompaction.getCompactedChangelogs());
+ metricGroup.gauge(
+ LAST_REWRITE_INPUT_FILE_SIZE,
+ () -> latestCompaction == null ? 0L :
latestCompaction.getRewriteInputFileSize());
+ metricGroup.gauge(
+ LAST_REWRITE_OUTPUT_FILE_SIZE,
+ () -> latestCompaction == null ? 0L :
latestCompaction.getRewriteOutputFileSize());
+ metricGroup.gauge(
+ LAST_REWRITE_CHANGELOG_FILE_SIZE,
+ () ->
+ latestCompaction == null
+ ? 0L
+ :
latestCompaction.getRewriteChangelogFileSize());
+ }
+
+ public void reportCompaction(CompactionStats compactionStats) {
+ latestCompaction = compactionStats;
+ durationHistogram.update(compactionStats.getDuration());
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CompactionStats.java
b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CompactionStats.java
new file mode 100644
index 000000000..ba71a2dad
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CompactionStats.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation.metrics;
+
+import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.io.DataFileMeta;
+
+import java.util.List;
+
+/** Statistics for a compaction. */
+public class CompactionStats {
+ private final long duration;
+ private final long compactedDataFilesBefore;
+ private final long compactedDataFilesAfter;
+ private final long compactedChangelogs;
+ private final long rewriteInputFileSize;
+ private final long rewriteOutputFileSize;
+ private final long rewriteChangelogFileSize;
+
+ public CompactionStats(
+ long compactionDuration,
+ List<DataFileMeta> compactBefore,
+ List<DataFileMeta> compactAfter,
+ List<DataFileMeta> compactChangelog) {
+ this.duration = compactionDuration;
+ this.compactedDataFilesBefore = compactBefore.size();
+ this.compactedDataFilesAfter = compactAfter.size();
+ this.compactedChangelogs = compactChangelog.size();
+ this.rewriteInputFileSize = rewriteFileSize(compactBefore);
+ this.rewriteOutputFileSize = rewriteFileSize(compactAfter);
+ this.rewriteChangelogFileSize = rewriteFileSize(compactChangelog);
+ }
+
+ @VisibleForTesting
+ protected long getDuration() {
+ return duration;
+ }
+
+ protected long getCompactedDataFilesBefore() {
+ return compactedDataFilesBefore;
+ }
+
+ protected long getCompactedDataFilesAfter() {
+ return compactedDataFilesAfter;
+ }
+
+ protected long getCompactedChangelogs() {
+ return compactedChangelogs;
+ }
+
+ protected long getRewriteInputFileSize() {
+ return rewriteInputFileSize;
+ }
+
+ protected long getRewriteOutputFileSize() {
+ return rewriteOutputFileSize;
+ }
+
+ protected long getRewriteChangelogFileSize() {
+ return rewriteChangelogFileSize;
+ }
+
+ private long rewriteFileSize(List<DataFileMeta> files) {
+ return files.stream().mapToLong(DataFileMeta::fileSize).sum();
+ }
+
+ @Override
+ public String toString() {
+ return "CompactionStats{"
+ + "duration="
+ + duration
+ + ", compactedDataFilesBefore="
+ + compactedDataFilesBefore
+ + ", compactedDataFilesAfter="
+ + compactedDataFilesAfter
+ + ", compactedChangelogs="
+ + compactedChangelogs
+ + ", rewriteInputFileSize="
+ + rewriteInputFileSize
+ + ", rewriteOutputFileSize="
+ + rewriteOutputFileSize
+ + ", rewriteChangelogFileSize="
+ + rewriteChangelogFileSize
+ + '}';
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
index 6f145ae9a..272870cc4 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
@@ -81,7 +81,8 @@ public class AppendOnlyFileStoreTable extends
AbstractFileStoreTable {
new CoreOptions(tableSchema.options()),
tableSchema.logicalPartitionType(),
tableSchema.logicalBucketKeyType(),
- tableSchema.logicalRowType());
+ tableSchema.logicalRowType(),
+ name());
}
return lazyStore;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
index 78190fe97..02c6a36c2 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
@@ -106,7 +106,8 @@ public class PrimaryKeyFileStoreTable extends
AbstractFileStoreTable {
new RowType(extractor.keyFields(tableSchema)),
rowType,
extractor,
- mfFactory);
+ mfFactory,
+ name());
}
return lazyStore;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWrite.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWrite.java
index 57bfbd190..bf61cc57b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWrite.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWrite.java
@@ -23,6 +23,7 @@ import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.memory.MemorySegmentPool;
+import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.table.Table;
/**
@@ -58,4 +59,7 @@ public interface TableWrite extends AutoCloseable {
* changelog.
*/
void compact(BinaryRow partition, int bucket, boolean fullCompaction)
throws Exception;
+
+ /** With metrics to measure compaction. */
+ TableWrite withMetricRegistry(MetricRegistry registry);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
index d3fe25007..f34a38a44 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
@@ -26,6 +26,7 @@ import org.apache.paimon.disk.IOManager;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.memory.MemoryPoolFactory;
import org.apache.paimon.memory.MemorySegmentPool;
+import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.operation.AbstractFileStoreWrite;
import org.apache.paimon.operation.FileStoreWrite;
import org.apache.paimon.utils.Restorable;
@@ -146,6 +147,12 @@ public class TableWriteImpl<T>
write.compact(partition, bucket, fullCompaction);
}
+ @Override
+ public TableWriteImpl<T> withMetricRegistry(MetricRegistry metricRegistry)
{
+ write.withMetricRegistry(metricRegistry);
+ return this;
+ }
+
/**
* Notify that some new files are created at given snapshot in given
bucket.
*
diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
index 6b3b3f8a0..fbf058f7f 100644
--- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
+++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
@@ -112,7 +112,8 @@ public class TestFileStore extends KeyValueFileStore {
keyType,
valueType,
keyValueFieldsExtractor,
- mfFactory);
+ mfFactory,
+ (new Path(root)).getName());
this.root = root;
this.fileIO = FileIOFinder.find(new Path(root));
this.keySerializer = new InternalRowSerializer(keyType);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyCompactManagerTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyCompactManagerTest.java
index dc4d250c8..68bc4db9c 100644
---
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyCompactManagerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyCompactManagerTest.java
@@ -206,8 +206,8 @@ public class AppendOnlyCompactManagerTest {
minFileNum,
maxFileNum,
targetFileSize,
- null // not used
- );
+ null, // not used
+ null);
Optional<List<DataFileMeta>> actual = manager.pickCompactBefore();
assertThat(actual.isPresent()).isEqualTo(expectedPresent);
if (expectedPresent) {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
index 62a0292a8..4eefdfef6 100644
---
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
@@ -527,7 +527,8 @@ public class AppendOnlyWriterTest {
? Collections.emptyList()
: Collections.singletonList(
generateCompactAfter(compactBefore));
- });
+ },
+ null);
CoreOptions options = new CoreOptions(new HashMap<>());
AppendOnlyWriter writer =
new AppendOnlyWriter(
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/FullCompactTaskTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/FullCompactTaskTest.java
index aab1f1ad8..5e474ccd4 100644
---
a/paimon-core/src/test/java/org/apache/paimon/append/FullCompactTaskTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/append/FullCompactTaskTest.java
@@ -123,7 +123,7 @@ public class FullCompactTaskTest {
Collection<DataFileMeta> inputs,
long targetFileSize,
AppendOnlyCompactManager.CompactRewriter rewriter) {
- super(inputs, targetFileSize, rewriter);
+ super(inputs, targetFileSize, rewriter, null);
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
index 3404214d1..c8f5ebd00 100644
---
a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
@@ -76,7 +76,8 @@ public class FileFormatSuffixTest extends
KeyValueFileReadWriteTest {
10,
SCHEMA,
0,
- new AppendOnlyCompactManager(null, toCompact, 4, 10,
10, null), // not used
+ new AppendOnlyCompactManager(
+ null, toCompact, 4, 10, 10, null, null), //
not used
false,
dataFilePathFactory,
null,
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
index 718d8714a..4e544b0e6 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
@@ -444,7 +444,8 @@ public abstract class MergeTreeTestBase {
comparator,
options.compactionFileSize(),
options.numSortedRunStopTrigger(),
- new TestRewriter());
+ new TestRewriter(),
+ null);
}
static class MockFailResultCompactionManager extends
MergeTreeCompactManager {
@@ -463,7 +464,8 @@ public abstract class MergeTreeTestBase {
keyComparator,
minFileSize,
numSortedRunStopTrigger,
- rewriter);
+ rewriter,
+ null);
}
protected CompactResult obtainCompactResult()
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerTest.java
index 4de33bfe0..338013d32 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerTest.java
@@ -204,7 +204,8 @@ public class MergeTreeCompactManagerTest {
comparator,
2,
Integer.MAX_VALUE,
- new TestRewriter(expectedDropDelete));
+ new TestRewriter(expectedDropDelete),
+ null);
manager.triggerCompaction(false);
manager.getCompactionResult(true);
List<LevelMinMax> outputs =
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/metrics/CompactionMetricsTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/metrics/CompactionMetricsTest.java
new file mode 100644
index 000000000..eb180d3c1
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/metrics/CompactionMetricsTest.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation.metrics;
+
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataFileTestUtils;
+import org.apache.paimon.metrics.Gauge;
+import org.apache.paimon.metrics.Histogram;
+import org.apache.paimon.metrics.Metric;
+import org.apache.paimon.metrics.MetricRegistryImpl;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.offset;
+
+/** Tests for {@link CompactionMetrics}. */
+public class CompactionMetricsTest {
+
+ private static final String TABLE_NAME = "myTable";
+ private static final String PARTITION = "date=20230623";
+ private static final int BUCKET = 5;
+
+ /** Tests that the metrics are updated properly. */
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testMetricsAreUpdated() {
+ CompactionMetrics compactionMetrics = getCompactionMetrics();
+ Map<String, Metric> registeredGenericMetrics =
+ compactionMetrics.getMetricGroup().getMetrics();
+
+ // Check initial values
+ Gauge<Long> lastCompactionDuration =
+ (Gauge<Long>)
+
registeredGenericMetrics.get(CompactionMetrics.LAST_COMPACTION_DURATION);
+ Histogram compactionDuration =
+ (Histogram)
registeredGenericMetrics.get(CompactionMetrics.COMPACTION_DURATION);
+ Gauge<Long> lastTableFilesCompactedBefore =
+ (Gauge<Long>)
+ registeredGenericMetrics.get(
+
CompactionMetrics.LAST_TABLE_FILES_COMPACTED_BEFORE);
+ Gauge<Long> lastTableFilesCompactedAfter =
+ (Gauge<Long>)
+ registeredGenericMetrics.get(
+
CompactionMetrics.LAST_TABLE_FILES_COMPACTED_AFTER);
+ Gauge<Long> lastChangelogFilesCompacted =
+ (Gauge<Long>)
+ registeredGenericMetrics.get(
+
CompactionMetrics.LAST_CHANGELOG_FILES_COMPACTED);
+ Gauge<Long> lastRewriteInputFileSize =
+ (Gauge<Long>)
+ registeredGenericMetrics.get(
+
CompactionMetrics.LAST_REWRITE_INPUT_FILE_SIZE);
+ Gauge<Long> lastRewriteOutputFileSize =
+ (Gauge<Long>)
+ registeredGenericMetrics.get(
+
CompactionMetrics.LAST_REWRITE_OUTPUT_FILE_SIZE);
+ Gauge<Long> lastRewriteChangelogFileSize =
+ (Gauge<Long>)
+ registeredGenericMetrics.get(
+
CompactionMetrics.LAST_REWRITE_CHANGELOG_FILE_SIZE);
+
+ assertThat(lastCompactionDuration.getValue()).isEqualTo(0);
+ assertThat(compactionDuration.getCount()).isEqualTo(0);
+ assertThat(compactionDuration.getStatistics().size()).isEqualTo(0);
+ assertThat(lastTableFilesCompactedBefore.getValue()).isEqualTo(0);
+ assertThat(lastTableFilesCompactedAfter.getValue()).isEqualTo(0);
+ assertThat(lastChangelogFilesCompacted.getValue()).isEqualTo(0);
+ assertThat(lastRewriteInputFileSize.getValue()).isEqualTo(0);
+ assertThat(lastRewriteOutputFileSize.getValue()).isEqualTo(0);
+ assertThat(lastRewriteChangelogFileSize.getValue()).isEqualTo(0);
+
+ // report once
+ reportOnce(compactionMetrics);
+
+ // generic metrics value updated
+ assertThat(lastCompactionDuration.getValue()).isEqualTo(3000);
+ assertThat(compactionDuration.getCount()).isEqualTo(1);
+ assertThat(compactionDuration.getStatistics().size()).isEqualTo(1);
+
assertThat(compactionDuration.getStatistics().getValues()[0]).isEqualTo(3000L);
+
assertThat(compactionDuration.getStatistics().getMin()).isEqualTo(3000);
+ assertThat(compactionDuration.getStatistics().getQuantile(0.5))
+ .isCloseTo(3000.0, offset(0.001));
+
assertThat(compactionDuration.getStatistics().getMean()).isEqualTo(3000);
+
assertThat(compactionDuration.getStatistics().getMax()).isEqualTo(3000);
+
assertThat(compactionDuration.getStatistics().getStdDev()).isEqualTo(0);
+ assertThat(lastTableFilesCompactedBefore.getValue()).isEqualTo(2);
+ assertThat(lastTableFilesCompactedAfter.getValue()).isEqualTo(2);
+ assertThat(lastChangelogFilesCompacted.getValue()).isEqualTo(2);
+ assertThat(lastRewriteInputFileSize.getValue()).isEqualTo(2001);
+ assertThat(lastRewriteOutputFileSize.getValue()).isEqualTo(1101);
+ assertThat(lastRewriteChangelogFileSize.getValue()).isEqualTo(3001);
+
+ // report again
+ reportAgain(compactionMetrics);
+
+ // generic metrics value updated
+ assertThat(lastCompactionDuration.getValue()).isEqualTo(6000);
+ assertThat(compactionDuration.getCount()).isEqualTo(2);
+ assertThat(compactionDuration.getStatistics().size()).isEqualTo(2);
+
assertThat(compactionDuration.getStatistics().getValues()[1]).isEqualTo(6000L);
+
assertThat(compactionDuration.getStatistics().getMin()).isEqualTo(3000);
+ assertThat(compactionDuration.getStatistics().getQuantile(0.5))
+ .isCloseTo(4500, offset(0.001));
+
assertThat(compactionDuration.getStatistics().getMean()).isEqualTo(4500);
+
assertThat(compactionDuration.getStatistics().getMax()).isEqualTo(6000);
+ assertThat(compactionDuration.getStatistics().getStdDev())
+ .isCloseTo(2121.320, offset(0.001));
+ assertThat(lastTableFilesCompactedBefore.getValue()).isEqualTo(2);
+ assertThat(lastTableFilesCompactedAfter.getValue()).isEqualTo(2);
+ assertThat(lastChangelogFilesCompacted.getValue()).isEqualTo(2);
+ assertThat(lastRewriteInputFileSize.getValue()).isEqualTo(2001);
+ assertThat(lastRewriteOutputFileSize.getValue()).isEqualTo(1201);
+ assertThat(lastRewriteChangelogFileSize.getValue()).isEqualTo(2501);
+ }
+
+ private void reportOnce(CompactionMetrics compactionMetrics) {
+ List<DataFileMeta> compactBefore = new ArrayList<>();
+ List<DataFileMeta> compactAfter = new ArrayList<>();
+ List<DataFileMeta> compactChangelog = new ArrayList<>();
+ compactBefore.add(DataFileTestUtils.newFile(0, 1000));
+ compactBefore.add(DataFileTestUtils.newFile(1001, 2000));
+ compactAfter.add(DataFileTestUtils.newFile(400, 1000));
+ compactAfter.add(DataFileTestUtils.newFile(1001, 1500));
+ compactChangelog.add(DataFileTestUtils.newFile(0, 2000));
+ compactChangelog.add(DataFileTestUtils.newFile(2001, 3000));
+
+ CompactionStats compactionStats =
+ new CompactionStats(3000, compactBefore, compactAfter,
compactChangelog);
+
+ compactionMetrics.reportCompaction(compactionStats);
+ }
+
+ private void reportAgain(CompactionMetrics compactionMetrics) {
+ List<DataFileMeta> compactBefore = new ArrayList<>();
+ List<DataFileMeta> compactAfter = new ArrayList<>();
+ List<DataFileMeta> compactChangelog = new ArrayList<>();
+ compactBefore.add(DataFileTestUtils.newFile(2000, 3000));
+ compactBefore.add(DataFileTestUtils.newFile(3001, 4000));
+ compactAfter.add(DataFileTestUtils.newFile(600, 1200));
+ compactAfter.add(DataFileTestUtils.newFile(1201, 1800));
+ compactChangelog.add(DataFileTestUtils.newFile(0, 1500));
+ compactChangelog.add(DataFileTestUtils.newFile(1501, 2500));
+
+ CompactionStats compactionStats =
+ new CompactionStats(6000, compactBefore, compactAfter,
compactChangelog);
+
+ compactionMetrics.reportCompaction(compactionStats);
+ }
+
+ private CompactionMetrics getCompactionMetrics() {
+ return new CompactionMetrics(new MetricRegistryImpl(), TABLE_NAME,
PARTITION, BUCKET);
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
index 92527df54..0f9d1cd30 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
@@ -189,7 +189,8 @@ public class TestChangelogDataReadWrite {
null, // not used, we only create an empty
writer
null,
options,
- EXTRACTOR)
+ EXTRACTOR,
+ tablePath.getName())
.createWriterContainer(partition, bucket, true)
.writer;
((MemoryOwner) writer)