This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 59451f74e [core] Remove bucket level metrics because metric names will
stay in Flink's jobmanager forever and will cause OOM (#2930)
59451f74e is described below
commit 59451f74e155c184052e493aa8614ff85693d6d6
Author: tsreaper <[email protected]>
AuthorDate: Tue Mar 5 15:12:52 2024 +0800
[core] Remove bucket level metrics because metric names will stay in
Flink's jobmanager forever and will cause OOM (#2930)
---
docs/content/maintenance/metrics.md | 117 +------------
.../paimon/append/AppendOnlyCompactManager.java | 25 +--
.../org/apache/paimon/append/AppendOnlyWriter.java | 24 +--
.../paimon/compact/CompactFutureManager.java | 5 -
.../org/apache/paimon/compact/CompactTask.java | 46 +++--
.../apache/paimon/mergetree/MergeTreeWriter.java | 27 +--
.../mergetree/compact/MergeTreeCompactManager.java | 29 ++--
.../mergetree/compact/MergeTreeCompactTask.java | 4 +-
.../org/apache/paimon/metrics/MetricRegistry.java | 12 --
.../paimon/operation/AbstractFileStoreWrite.java | 40 +----
.../paimon/operation/AppendOnlyFileStoreWrite.java | 9 +-
.../paimon/operation/KeyValueFileStoreWrite.java | 9 +-
.../paimon/operation/MemoryFileStoreWrite.java | 5 +-
.../paimon/operation/metrics/CompactTimer.java | 163 +++++++++++++++++
.../operation/metrics/CompactionMetrics.java | 162 ++++++++++-------
.../paimon/operation/metrics/CompactionStats.java | 102 -----------
.../paimon/operation/metrics/MetricUtils.java | 33 ++++
.../operation/metrics/WriterBufferMetric.java | 2 +-
.../paimon/operation/metrics/WriterMetrics.java | 68 --------
.../apache/paimon/append/AppendOnlyWriterTest.java | 3 +-
.../apache/paimon/format/FileFormatSuffixTest.java | 3 +-
.../apache/paimon/mergetree/MergeTreeTestBase.java | 1 -
.../paimon/operation/metrics/CompactTimerTest.java | 71 ++++++++
.../operation/metrics/CompactionMetricsTest.java | 175 +++----------------
.../cdc/CdcDynamicBucketWriteOperatorTest.java | 193 ---------------------
.../cdc/CdcRecordStoreMultiWriteOperatorTest.java | 88 ----------
.../sink/cdc/CdcRecordStoreWriteOperatorTest.java | 75 --------
.../paimon/flink/sink/CommitterOperatorTest.java | 28 ++-
.../paimon/flink/sink/CompactorSinkITCase.java | 191 --------------------
.../apache/paimon/flink/sink/FlinkSinkTest.java | 142 ---------------
.../paimon/flink/sink/StoreMultiCommitterTest.java | 32 +++-
.../paimon/flink/sink/WriterOperatorTestBase.java | 29 +---
.../flink/source/FileStoreSourceMetricsTest.java | 30 ++--
.../flink/source/operator/OperatorSourceTest.java | 12 +-
.../{MetricUtils.java => TestingMetricUtils.java} | 2 +-
35 files changed, 550 insertions(+), 1407 deletions(-)
diff --git a/docs/content/maintenance/metrics.md
b/docs/content/maintenance/metrics.md
index 973a8fcfe..5950c518e 100644
--- a/docs/content/maintenance/metrics.md
+++ b/docs/content/maintenance/metrics.md
@@ -28,7 +28,7 @@ under the License.
Paimon has built a metrics system to measure the behaviours of reading and
writing, like how many manifest files it scanned in the last planning, how long
it took in the last commit operation, how many files it deleted in the last
compact operation.
-In Paimon's metrics system, metrics are updated and reported at different
levels of granularity. Currently, the levels of **table** and **bucket** are
provided, which means you can get metrics per table or bucket.
+In Paimon's metrics system, metrics are updated and reported at table
granularity.
There are three types of metrics provided in the Paimon metric system,
`Gauge`, `Counter`, `Histogram`.
- `Gauge`: Provides a value of any type at a point in time.
@@ -47,7 +47,6 @@ Below is lists of Paimon built-in metrics. They are
summarized into types of sca
<thead>
<tr>
<th class="text-left" style="width: 225pt">Metrics Name</th>
- <th class="text-left" style="width: 65pt">Level</th>
<th class="text-left" style="width: 70pt">Type</th>
<th class="text-left" style="width: 300pt">Description</th>
</tr>
@@ -55,49 +54,41 @@ Below is lists of Paimon built-in metrics. They are
summarized into types of sca
<tbody>
<tr>
<td>lastScanDuration</td>
- <td>Table</td>
<td>Gauge</td>
<td>The time it took to complete the last scan.</td>
</tr>
<tr>
<td>scanDuration</td>
- <td>Table</td>
<td>Histogram</td>
<td>Distributions of the time taken by the last few scans.</td>
</tr>
<tr>
<td>lastScannedManifests</td>
- <td>Table</td>
<td>Gauge</td>
<td>Number of scanned manifest files in the last scan.</td>
</tr>
<tr>
<td>lastSkippedByPartitionAndStats</td>
- <td>Table</td>
<td>Gauge</td>
<td>Skipped table files by partition filter and value / key stats
information in the last scan.</td>
</tr>
<tr>
<td>lastSkippedByBucketAndLevelFilter</td>
- <td>Table</td>
<td>Gauge</td>
<td>Skipped table files by bucket, bucket key and level filter in
the last scan.</td>
</tr>
<tr>
<td>lastSkippedByWholeBucketFilesFilter</td>
- <td>Table</td>
<td>Gauge</td>
<td>Skipped table files by bucket level value filter (only primary
key table) in the last scan.</td>
</tr>
<tr>
<td>lastScanSkippedTableFiles</td>
- <td>Table</td>
<td>Gauge</td>
<td>Total skipped table files in the last scan.</td>
</tr>
<tr>
<td>lastScanResultedTableFiles</td>
- <td>Table</td>
<td>Gauge</td>
<td>Resulted table files in the last scan.</td>
</tr>
@@ -110,7 +101,6 @@ Below is lists of Paimon built-in metrics. They are
summarized into types of sca
<thead>
<tr>
<th class="text-left" style="width: 225pt">Metrics Name</th>
- <th class="text-left" style="width: 65pt">Level</th>
<th class="text-left" style="width: 70pt">Type</th>
<th class="text-left" style="width: 300pt">Description</th>
</tr>
@@ -118,143 +108,93 @@ Below is lists of Paimon built-in metrics. They are
summarized into types of sca
<tbody>
<tr>
<td>lastCommitDuration</td>
- <td>Table</td>
<td>Gauge</td>
<td>The time it took to complete the last commit.</td>
</tr>
<tr>
<td>commitDuration</td>
- <td>Table</td>
<td>Histogram</td>
<td>Distributions of the time taken by the last few commits.</td>
</tr>
<tr>
<td>lastCommitAttempts</td>
- <td>Table</td>
<td>Gauge</td>
<td>The number of attempts the last commit made.</td>
</tr>
<tr>
<td>lastTableFilesAdded</td>
- <td>Table</td>
<td>Gauge</td>
<td>Number of added table files in the last commit, including
newly created data files and compacted after.</td>
</tr>
<tr>
<td>lastTableFilesDeleted</td>
- <td>Table</td>
<td>Gauge</td>
<td>Number of deleted table files in the last commit, which comes
from compacted before.</td>
</tr>
<tr>
<td>lastTableFilesAppended</td>
- <td>Table</td>
<td>Gauge</td>
<td>Number of appended table files in the last commit, which means
the newly created data files.</td>
</tr>
<tr>
<td>lastTableFilesCommitCompacted</td>
- <td>Table</td>
<td>Gauge</td>
<td>Number of compacted table files in the last commit, including
compacted before and after.</td>
</tr>
<tr>
<td>lastChangelogFilesAppended</td>
- <td>Table</td>
<td>Gauge</td>
<td>Number of appended changelog files in last commit.</td>
</tr>
<tr>
<td>lastChangelogFileCommitCompacted</td>
- <td>Table</td>
<td>Gauge</td>
<td>Number of compacted changelog files in last commit.</td>
</tr>
<tr>
<td>lastGeneratedSnapshots</td>
- <td>Table</td>
<td>Gauge</td>
<td>Number of snapshot files generated in the last commit, maybe 1
snapshot or 2 snapshots.</td>
</tr>
<tr>
<td>lastDeltaRecordsAppended</td>
- <td>Table</td>
<td>Gauge</td>
<td>Delta records count in last commit with APPEND commit
kind.</td>
</tr>
<tr>
<td>lastChangelogRecordsAppended</td>
- <td>Table</td>
<td>Gauge</td>
<td>Changelog records count in last commit with APPEND commit
kind.</td>
</tr>
<tr>
<td>lastDeltaRecordsCommitCompacted</td>
- <td>Table</td>
<td>Gauge</td>
<td>Delta records count in last commit with COMPACT commit
kind.</td>
</tr>
<tr>
<td>lastChangelogRecordsCommitCompacted</td>
- <td>Table</td>
<td>Gauge</td>
<td>Changelog records count in last commit with COMPACT commit
kind.</td>
</tr>
<tr>
<td>lastPartitionsWritten</td>
- <td>Table</td>
<td>Gauge</td>
<td>Number of partitions written in the last commit.</td>
</tr>
<tr>
<td>lastBucketsWritten</td>
- <td>Table</td>
<td>Gauge</td>
<td>Number of buckets written in the last commit.</td>
</tr>
</tbody>
</table>
-### Write Metrics
-
-<table class="table table-bordered">
- <thead>
- <tr>
- <th class="text-left" style="width: 225pt">Metrics Name</th>
- <th class="text-left" style="width: 65pt">Level</th>
- <th class="text-left" style="width: 70pt">Type</th>
- <th class="text-left" style="width: 300pt">Description</th>
- </tr>
- </thead>
- <tbody>
- <tr>
- <td>writeRecordCount</td>
- <td>Bucket</td>
- <td>Counter</td>
- <td>Total number of records written into the bucket.</td>
- </tr>
- <tr>
- <td>flushCostMillis</td>
- <td>Bucket</td>
- <td>Histogram</td>
- <td>Distributions of the time taken by the last few write buffer
flushing.</td>
- </tr>
- <tr>
- <td>prepareCommitCostMillis</td>
- <td>Bucket</td>
- <td>Histogram</td>
- <td>Distributions of the time taken by the last few call of
`prepareCommit`.</td>
- </tr>
- </tbody>
-</table>
-
### Write Buffer Metrics
<table class="table table-bordered">
<thead>
<tr>
<th class="text-left" style="width: 225pt">Metrics Name</th>
- <th class="text-left" style="width: 65pt">Level</th>
<th class="text-left" style="width: 70pt">Type</th>
<th class="text-left" style="width: 300pt">Description</th>
</tr>
@@ -262,19 +202,16 @@ Below is lists of Paimon built-in metrics. They are
summarized into types of sca
<tbody>
<tr>
<td>bufferPreemptCount</td>
- <td>Table</td>
<td>Gauge</td>
<td>The total number of memory preempted.</td>
</tr>
<tr>
<td>usedWriteBufferSizeByte</td>
- <td>Table</td>
<td>Gauge</td>
<td>Current used write buffer size in byte.</td>
</tr>
<tr>
<td>totalWriteBufferSizeByte</td>
- <td>Table</td>
<td>Gauge</td>
<td>The total write buffer size configured in byte.</td>
</tr>
@@ -287,65 +224,25 @@ Below is lists of Paimon built-in metrics. They are
summarized into types of sca
<thead>
<tr>
<th class="text-left" style="width: 225pt">Metrics Name</th>
- <th class="text-left" style="width: 65pt">Level</th>
<th class="text-left" style="width: 70pt">Type</th>
<th class="text-left" style="width: 300pt">Description</th>
</tr>
</thead>
<tbody>
<tr>
- <td>level0FileCount</td>
- <td>Bucket</td>
- <td>Gauge</td>
- <td>The level 0 file count will become larger if asynchronous
compaction cannot be done in time.</td>
- </tr>
- <tr>
- <td>lastCompactionDuration</td>
- <td>Bucket</td>
- <td>Gauge</td>
- <td>The time it took to complete the last compaction.</td>
- </tr>
- <tr>
- <td>compactionDuration</td>
- <td>Bucket</td>
- <td>Histogram</td>
- <td>Distributions of the time taken by the last few
compaction.</td>
- </tr>
- <tr>
- <td>lastTableFilesCompactedBefore</td>
- <td>Bucket</td>
- <td>Gauge</td>
- <td>Number of deleted files in the last compaction.</td>
- </tr>
- <tr>
- <td>lastTableFilesCompactedAfter</td>
- <td>Bucket</td>
- <td>Gauge</td>
- <td>Number of added files in the last compaction.</td>
- </tr>
- <tr>
- <td>lastChangelogFilesCompacted</td>
- <td>Bucket</td>
- <td>Gauge</td>
- <td>Number of changelog files compacted in last compaction.</td>
- </tr>
- <tr>
- <td>lastRewriteInputFileSize</td>
- <td>Bucket</td>
+ <td>maxLevel0FileCount</td>
<td>Gauge</td>
- <td>Size of deleted files in the last compaction.</td>
+ <td>The maximum number of level 0 files currently handled by this
writer. This value will become larger if asynchronous compaction cannot be done
in time.</td>
</tr>
<tr>
- <td>lastRewriteOutputFileSize</td>
- <td>Bucket</td>
+ <td>avgLevel0FileCount</td>
<td>Gauge</td>
- <td>Size of added files in the last compaction.</td>
+ <td>The average number of level 0 files currently handled by this
writer. This value will become larger if asynchronous compaction cannot be done
in time.</td>
</tr>
<tr>
- <td>lastRewriteChangelogFileSize</td>
- <td>Bucket</td>
+ <td>compactionThreadBusy</td>
<td>Gauge</td>
- <td>Size of changelog files compacted in last compaction.</td>
+ <td>The maximum business of compaction threads in this
parallelism. Currently, there is only one compaction thread in each
parallelism, so value of business ranges from 0 (idle) to 100 (compaction
running all the time).</td>
</tr>
</tbody>
</table>
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyCompactManager.java
b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyCompactManager.java
index 58106e070..7d4346bdb 100644
---
a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyCompactManager.java
+++
b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyCompactManager.java
@@ -25,6 +25,7 @@ import org.apache.paimon.compact.CompactResult;
import org.apache.paimon.compact.CompactTask;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.operation.metrics.CompactionMetrics;
+import org.apache.paimon.operation.metrics.MetricUtils;
import org.apache.paimon.utils.Preconditions;
import org.slf4j.Logger;
@@ -59,7 +60,7 @@ public class AppendOnlyCompactManager extends
CompactFutureManager {
private List<DataFileMeta> compacting;
- @Nullable private final CompactionMetrics metrics;
+ @Nullable private final CompactionMetrics.Reporter metricsReporter;
public AppendOnlyCompactManager(
ExecutorService executor,
@@ -68,7 +69,7 @@ public class AppendOnlyCompactManager extends
CompactFutureManager {
int maxFileNum,
long targetFileSize,
CompactRewriter rewriter,
- @Nullable CompactionMetrics metrics) {
+ @Nullable CompactionMetrics.Reporter metricsReporter) {
this.executor = executor;
this.toCompact = new TreeSet<>(fileComparator(false));
this.toCompact.addAll(restored);
@@ -76,7 +77,7 @@ public class AppendOnlyCompactManager extends
CompactFutureManager {
this.maxFileNum = maxFileNum;
this.targetFileSize = targetFileSize;
this.rewriter = rewriter;
- this.metrics = metrics;
+ this.metricsReporter = metricsReporter;
}
@Override
@@ -98,7 +99,8 @@ public class AppendOnlyCompactManager extends
CompactFutureManager {
}
taskFuture =
- executor.submit(new FullCompactTask(toCompact, targetFileSize,
rewriter, metrics));
+ executor.submit(
+ new FullCompactTask(toCompact, targetFileSize,
rewriter, metricsReporter));
compacting = new ArrayList<>(toCompact);
toCompact.clear();
}
@@ -110,7 +112,8 @@ public class AppendOnlyCompactManager extends
CompactFutureManager {
Optional<List<DataFileMeta>> picked = pickCompactBefore();
if (picked.isPresent()) {
compacting = picked.get();
- taskFuture = executor.submit(new AutoCompactTask(compacting,
rewriter, metrics));
+ taskFuture =
+ executor.submit(new AutoCompactTask(compacting, rewriter,
metricsReporter));
}
}
@@ -196,8 +199,8 @@ public class AppendOnlyCompactManager extends
CompactFutureManager {
@Override
public void close() throws IOException {
- if (metrics != null) {
- metrics.close();
+ if (metricsReporter != null) {
+ MetricUtils.safeCall(metricsReporter::unregister, LOG);
}
}
@@ -212,8 +215,8 @@ public class AppendOnlyCompactManager extends
CompactFutureManager {
Collection<DataFileMeta> inputs,
long targetFileSize,
CompactRewriter rewriter,
- @Nullable CompactionMetrics metrics) {
- super(metrics);
+ @Nullable CompactionMetrics.Reporter metricsReporter) {
+ super(metricsReporter);
this.inputs = new LinkedList<>(inputs);
this.targetFileSize = targetFileSize;
this.rewriter = rewriter;
@@ -268,8 +271,8 @@ public class AppendOnlyCompactManager extends
CompactFutureManager {
public AutoCompactTask(
List<DataFileMeta> toCompact,
CompactRewriter rewriter,
- @Nullable CompactionMetrics metrics) {
- super(metrics);
+ @Nullable CompactionMetrics.Reporter metricsReporter) {
+ super(metricsReporter);
this.toCompact = toCompact;
this.rewriter = rewriter;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
index 0810f023b..b8344b301 100644
--- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
@@ -33,7 +33,6 @@ import org.apache.paimon.io.NewFilesIncrement;
import org.apache.paimon.io.RowDataRollingFileWriter;
import org.apache.paimon.memory.MemoryOwner;
import org.apache.paimon.memory.MemorySegmentPool;
-import org.apache.paimon.operation.metrics.WriterMetrics;
import org.apache.paimon.statistics.FieldStatsCollector;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
@@ -75,7 +74,6 @@ public class AppendOnlyWriter implements
RecordWriter<InternalRow>, MemoryOwner
private final IOManager ioManager;
private MemorySegmentPool memorySegmentPool;
- private WriterMetrics writerMetrics;
public AppendOnlyWriter(
FileIO fileIO,
@@ -92,8 +90,7 @@ public class AppendOnlyWriter implements
RecordWriter<InternalRow>, MemoryOwner
boolean useWriteBuffer,
boolean spillable,
String fileCompression,
- FieldStatsCollector.Factory[] statsCollectors,
- WriterMetrics writerMetrics) {
+ FieldStatsCollector.Factory[] statsCollectors) {
this.fileIO = fileIO;
this.schemaId = schemaId;
this.fileFormat = fileFormat;
@@ -118,7 +115,6 @@ public class AppendOnlyWriter implements
RecordWriter<InternalRow>, MemoryOwner
compactBefore.addAll(increment.compactIncrement().compactBefore());
compactAfter.addAll(increment.compactIncrement().compactAfter());
}
- this.writerMetrics = writerMetrics;
}
@Override
@@ -138,10 +134,6 @@ public class AppendOnlyWriter implements
RecordWriter<InternalRow>, MemoryOwner
throw new RuntimeException("Mem table is too small to hold a
single element.");
}
}
-
- if (writerMetrics != null) {
- writerMetrics.incWriteRecordNum();
- }
}
@Override
@@ -161,14 +153,9 @@ public class AppendOnlyWriter implements
RecordWriter<InternalRow>, MemoryOwner
@Override
public CommitIncrement prepareCommit(boolean waitCompaction) throws
Exception {
- long start = System.currentTimeMillis();
flush(false, false);
trySyncLatestCompaction(waitCompaction || forceCompact);
- CommitIncrement increment = drainIncrement();
- if (writerMetrics != null) {
-
writerMetrics.updatePrepareCommitCostMillis(System.currentTimeMillis() - start);
- }
- return increment;
+ return drainIncrement();
}
@Override
@@ -178,7 +165,6 @@ public class AppendOnlyWriter implements
RecordWriter<InternalRow>, MemoryOwner
@VisibleForTesting
void flush(boolean waitForLatestCompaction, boolean forcedFullCompaction)
throws Exception {
- long start = System.currentTimeMillis();
List<DataFileMeta> flushedFiles = sinkWriter.flush();
// add new generated files
@@ -186,9 +172,6 @@ public class AppendOnlyWriter implements
RecordWriter<InternalRow>, MemoryOwner
trySyncLatestCompaction(waitForLatestCompaction);
compactManager.triggerCompaction(forcedFullCompaction);
newFiles.addAll(flushedFiles);
- if (writerMetrics != null) {
-
writerMetrics.updateBufferFlushCostMillis(System.currentTimeMillis() - start);
- }
}
@Override
@@ -198,9 +181,6 @@ public class AppendOnlyWriter implements
RecordWriter<InternalRow>, MemoryOwner
@Override
public void close() throws Exception {
- if (writerMetrics != null) {
- writerMetrics.close();
- }
// cancel compaction so that it does not block job cancelling
compactManager.cancelCompaction();
sync();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/compact/CompactFutureManager.java
b/paimon-core/src/main/java/org/apache/paimon/compact/CompactFutureManager.java
index 7a8dc8da0..8b8f7e533 100644
---
a/paimon-core/src/main/java/org/apache/paimon/compact/CompactFutureManager.java
+++
b/paimon-core/src/main/java/org/apache/paimon/compact/CompactFutureManager.java
@@ -20,9 +20,6 @@ package org.apache.paimon.compact;
import org.apache.paimon.annotation.VisibleForTesting;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.util.Optional;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
@@ -31,8 +28,6 @@ import java.util.concurrent.Future;
/** Base implementation of {@link CompactManager} which runs compaction in a
separate thread. */
public abstract class CompactFutureManager implements CompactManager {
- private static final Logger LOG =
LoggerFactory.getLogger(CompactFutureManager.class);
-
protected Future<CompactResult> taskFuture;
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/compact/CompactTask.java
b/paimon-core/src/main/java/org/apache/paimon/compact/CompactTask.java
index dad4aa374..dac100517 100644
--- a/paimon-core/src/main/java/org/apache/paimon/compact/CompactTask.java
+++ b/paimon-core/src/main/java/org/apache/paimon/compact/CompactTask.java
@@ -20,14 +20,13 @@ package org.apache.paimon.compact;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.operation.metrics.CompactionMetrics;
-import org.apache.paimon.operation.metrics.CompactionStats;
+import org.apache.paimon.operation.metrics.MetricUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
-import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
@@ -35,40 +34,37 @@ import java.util.concurrent.Callable;
public abstract class CompactTask implements Callable<CompactResult> {
private static final Logger LOG =
LoggerFactory.getLogger(CompactTask.class);
- @Nullable private final CompactionMetrics metrics;
- public CompactTask(@Nullable CompactionMetrics metrics) {
- this.metrics = metrics;
+ @Nullable private final CompactionMetrics.Reporter metricsReporter;
+
+ public CompactTask(@Nullable CompactionMetrics.Reporter metricsReporter) {
+ this.metricsReporter = metricsReporter;
}
@Override
public CompactResult call() throws Exception {
- long startMillis = System.currentTimeMillis();
- CompactResult result = null;
+ MetricUtils.safeCall(this::startTimer, LOG);
try {
- result = doCompact();
-
+ long startMillis = System.currentTimeMillis();
+ CompactResult result = doCompact();
if (LOG.isDebugEnabled()) {
logMetric(startMillis, result.before(), result.after());
}
return result;
} finally {
- if (metrics != null) {
- long duration = System.currentTimeMillis() - startMillis;
- CompactionStats compactionStats =
- result == null
- ? new CompactionStats(
- duration,
- Collections.emptyList(),
- Collections.emptyList(),
- Collections.emptyList())
- : new CompactionStats(
- duration,
- result.before(),
- result.after(),
- result.changelog());
- metrics.reportCompaction(compactionStats);
- }
+ MetricUtils.safeCall(this::stopTimer, LOG);
+ }
+ }
+
+ private void startTimer() {
+ if (metricsReporter != null) {
+ metricsReporter.getCompactTimer().start();
+ }
+ }
+
+ private void stopTimer() {
+ if (metricsReporter != null) {
+ metricsReporter.getCompactTimer().finish();
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
index de5cbf14b..046fad893 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
@@ -33,7 +33,6 @@ import org.apache.paimon.io.RollingFileWriter;
import org.apache.paimon.memory.MemoryOwner;
import org.apache.paimon.memory.MemorySegmentPool;
import org.apache.paimon.mergetree.compact.MergeFunction;
-import org.apache.paimon.operation.metrics.WriterMetrics;
import org.apache.paimon.table.sink.SequenceGenerator;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.CommitIncrement;
@@ -78,8 +77,6 @@ public class MergeTreeWriter implements
RecordWriter<KeyValue>, MemoryOwner {
private long newSequenceNumber;
private WriteBuffer writeBuffer;
- private final WriterMetrics writerMetrics;
-
public MergeTreeWriter(
boolean writeBufferSpillable,
int sortMaxFan,
@@ -93,8 +90,7 @@ public class MergeTreeWriter implements
RecordWriter<KeyValue>, MemoryOwner {
boolean commitForceCompact,
ChangelogProducer changelogProducer,
@Nullable CommitIncrement increment,
- @Nullable SequenceGenerator sequenceGenerator,
- WriterMetrics writerMetrics) {
+ @Nullable SequenceGenerator sequenceGenerator) {
this.writeBufferSpillable = writeBufferSpillable;
this.sortMaxFan = sortMaxFan;
this.sortCompression = sortCompression;
@@ -125,7 +121,6 @@ public class MergeTreeWriter implements
RecordWriter<KeyValue>, MemoryOwner {
compactAfter.addAll(increment.compactIncrement().compactAfter());
compactChangelog.addAll(increment.compactIncrement().changelogFiles());
}
- this.writerMetrics = writerMetrics;
}
private long newSequenceNumber() {
@@ -164,10 +159,6 @@ public class MergeTreeWriter implements
RecordWriter<KeyValue>, MemoryOwner {
throw new RuntimeException("Mem table is too small to hold a
single element.");
}
}
-
- if (writerMetrics != null) {
- writerMetrics.incWriteRecordNum();
- }
}
@Override
@@ -200,7 +191,6 @@ public class MergeTreeWriter implements
RecordWriter<KeyValue>, MemoryOwner {
private void flushWriteBuffer(boolean waitForLatestCompaction, boolean
forcedFullCompaction)
throws Exception {
- long start = System.currentTimeMillis();
if (writeBuffer.size() > 0) {
if (compactManager.shouldWaitForLatestCompaction()) {
waitForLatestCompaction = true;
@@ -240,26 +230,16 @@ public class MergeTreeWriter implements
RecordWriter<KeyValue>, MemoryOwner {
trySyncLatestCompaction(waitForLatestCompaction);
compactManager.triggerCompaction(forcedFullCompaction);
- if (writerMetrics != null) {
-
writerMetrics.updateBufferFlushCostMillis(System.currentTimeMillis() - start);
- }
}
@Override
public CommitIncrement prepareCommit(boolean waitCompaction) throws
Exception {
- long start = System.currentTimeMillis();
flushWriteBuffer(waitCompaction, false);
trySyncLatestCompaction(
waitCompaction
|| commitForceCompact
|| compactManager.shouldWaitForPreparingCheckpoint());
-
- CommitIncrement increment = drainIncrement();
-
- if (writerMetrics != null) {
-
writerMetrics.updatePrepareCommitCostMillis(System.currentTimeMillis() - start);
- }
- return increment;
+ return drainIncrement();
}
@Override
@@ -320,9 +300,6 @@ public class MergeTreeWriter implements
RecordWriter<KeyValue>, MemoryOwner {
@Override
public void close() throws Exception {
- if (writerMetrics != null) {
- writerMetrics.close();
- }
// cancel compaction so that it does not block job cancelling
compactManager.cancelCompaction();
sync();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
index 4da1e2038..6d94ad574 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
@@ -28,6 +28,7 @@ import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.mergetree.LevelSortedRun;
import org.apache.paimon.mergetree.Levels;
import org.apache.paimon.operation.metrics.CompactionMetrics;
+import org.apache.paimon.operation.metrics.MetricUtils;
import org.apache.paimon.utils.Preconditions;
import org.slf4j.Logger;
@@ -56,7 +57,7 @@ public class MergeTreeCompactManager extends
CompactFutureManager {
private final int numSortedRunStopTrigger;
private final CompactRewriter rewriter;
- @Nullable private final CompactionMetrics metrics;
+ @Nullable private final CompactionMetrics.Reporter metricsReporter;
public MergeTreeCompactManager(
ExecutorService executor,
@@ -66,7 +67,7 @@ public class MergeTreeCompactManager extends
CompactFutureManager {
long compactionFileSize,
int numSortedRunStopTrigger,
CompactRewriter rewriter,
- @Nullable CompactionMetrics metrics) {
+ @Nullable CompactionMetrics.Reporter metricsReporter) {
this.executor = executor;
this.levels = levels;
this.strategy = strategy;
@@ -74,8 +75,9 @@ public class MergeTreeCompactManager extends
CompactFutureManager {
this.numSortedRunStopTrigger = numSortedRunStopTrigger;
this.keyComparator = keyComparator;
this.rewriter = rewriter;
- this.metrics = metrics;
- reportLevel0FileCount();
+ this.metricsReporter = metricsReporter;
+
+ MetricUtils.safeCall(this::reportLevel0FileCount, LOG);
}
@Override
@@ -92,7 +94,7 @@ public class MergeTreeCompactManager extends
CompactFutureManager {
@Override
public void addNewFile(DataFileMeta file) {
levels.addLevel0File(file);
- reportLevel0FileCount();
+ MetricUtils.safeCall(this::reportLevel0FileCount, LOG);
}
@Override
@@ -171,7 +173,12 @@ public class MergeTreeCompactManager extends
CompactFutureManager {
private void submitCompaction(CompactUnit unit, boolean dropDelete) {
MergeTreeCompactTask task =
new MergeTreeCompactTask(
- keyComparator, compactionFileSize, rewriter, unit,
dropDelete, metrics);
+ keyComparator,
+ compactionFileSize,
+ rewriter,
+ unit,
+ dropDelete,
+ metricsReporter);
if (LOG.isDebugEnabled()) {
LOG.debug(
"Pick these files (name, level, size) for compaction: {}",
@@ -200,7 +207,7 @@ public class MergeTreeCompactManager extends
CompactFutureManager {
r.after());
}
levels.update(r.before(), r.after());
- reportLevel0FileCount();
+ MetricUtils.safeCall(this::reportLevel0FileCount, LOG);
if (LOG.isDebugEnabled()) {
LOG.debug(
"Levels in compact manager updated. Current
runs are\n{}",
@@ -211,16 +218,16 @@ public class MergeTreeCompactManager extends
CompactFutureManager {
}
private void reportLevel0FileCount() {
- if (metrics != null) {
- metrics.reportLevel0FileCount(levels.level0().size());
+ if (metricsReporter != null) {
+ metricsReporter.reportLevel0FileCount(levels.level0().size());
}
}
@Override
public void close() throws IOException {
rewriter.close();
- if (metrics != null) {
- metrics.close();
+ if (metricsReporter != null) {
+ MetricUtils.safeCall(metricsReporter::unregister, LOG);
}
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactTask.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactTask.java
index 4fc11b076..7299fbb5c 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactTask.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactTask.java
@@ -54,8 +54,8 @@ public class MergeTreeCompactTask extends CompactTask {
CompactRewriter rewriter,
CompactUnit unit,
boolean dropDelete,
- @Nullable CompactionMetrics metrics) {
- super(metrics);
+ @Nullable CompactionMetrics.Reporter metricsReporter) {
+ super(metricsReporter);
this.minFileSize = minFileSize;
this.rewriter = rewriter;
this.outputLevel = unit.outputLevel();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/metrics/MetricRegistry.java
b/paimon-core/src/main/java/org/apache/paimon/metrics/MetricRegistry.java
index 60f0e2a07..25bcac4cb 100644
--- a/paimon-core/src/main/java/org/apache/paimon/metrics/MetricRegistry.java
+++ b/paimon-core/src/main/java/org/apache/paimon/metrics/MetricRegistry.java
@@ -25,8 +25,6 @@ import java.util.Map;
public abstract class MetricRegistry {
private static final String KEY_TABLE = "table";
- private static final String KEY_PARTITION = "partition";
- private static final String KEY_BUCKET = "bucket";
public MetricGroup tableMetricGroup(String groupName, String tableName) {
Map<String, String> variables = new LinkedHashMap<>();
@@ -35,16 +33,6 @@ public abstract class MetricRegistry {
return createMetricGroup(groupName, variables);
}
- public MetricGroup bucketMetricGroup(
- String groupName, String tableName, String partition, int bucket) {
- Map<String, String> variables = new LinkedHashMap<>();
- variables.put(KEY_TABLE, tableName);
- variables.put(KEY_PARTITION, partition);
- variables.put(KEY_BUCKET, String.valueOf(bucket));
-
- return createMetricGroup(groupName, variables);
- }
-
protected abstract MetricGroup createMetricGroup(
String groupName, Map<String, String> variables);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
index 7b119b858..af78e3463 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
@@ -22,7 +22,6 @@ import org.apache.paimon.Snapshot;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.disk.IOManager;
-import org.apache.paimon.fs.Path;
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.index.IndexMaintainer;
import org.apache.paimon.io.DataFileMeta;
@@ -31,12 +30,10 @@ import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.memory.MemoryPoolFactory;
import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.operation.metrics.CompactionMetrics;
-import org.apache.paimon.operation.metrics.WriterMetrics;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.utils.CommitIncrement;
import org.apache.paimon.utils.ExecutorThreadFactory;
-import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.RecordWriter;
import org.apache.paimon.utils.SnapshotManager;
@@ -77,10 +74,9 @@ public abstract class AbstractFileStoreWrite<T> implements
FileStoreWrite<T> {
private boolean closeCompactExecutorWhenLeaving = true;
private boolean ignorePreviousFiles = false;
protected boolean isStreamingMode = false;
- private MetricRegistry metricRegistry = null;
+ protected CompactionMetrics compactionMetrics = null;
protected final String tableName;
- private final FileStorePathFactory pathFactory;
protected AbstractFileStoreWrite(
String commitUser,
@@ -88,7 +84,6 @@ public abstract class AbstractFileStoreWrite<T> implements
FileStoreWrite<T> {
FileStoreScan scan,
@Nullable IndexMaintainer.Factory<T> indexFactory,
String tableName,
- FileStorePathFactory pathFactory,
int writerNumberMax) {
this.commitUser = commitUser;
this.snapshotManager = snapshotManager;
@@ -97,7 +92,6 @@ public abstract class AbstractFileStoreWrite<T> implements
FileStoreWrite<T> {
this.writers = new HashMap<>();
this.tableName = tableName;
- this.pathFactory = pathFactory;
this.writerNumberMax = writerNumberMax;
}
@@ -259,6 +253,9 @@ public abstract class AbstractFileStoreWrite<T> implements
FileStoreWrite<T> {
if (lazyCompactExecutor != null && closeCompactExecutorWhenLeaving) {
lazyCompactExecutor.shutdownNow();
}
+ if (compactionMetrics != null) {
+ compactionMetrics.close();
+ }
}
@Override
@@ -376,37 +373,10 @@ public abstract class AbstractFileStoreWrite<T>
implements FileStoreWrite<T> {
@Override
public FileStoreWrite<T> withMetricRegistry(MetricRegistry metricRegistry)
{
- this.metricRegistry = metricRegistry;
+ this.compactionMetrics = new CompactionMetrics(metricRegistry,
tableName);
return this;
}
- @Nullable
- public CompactionMetrics getCompactionMetrics(BinaryRow partition, int
bucket) {
- if (metricRegistry != null) {
- return new CompactionMetrics(
- metricRegistry, tableName, getPartitionString(pathFactory,
partition), bucket);
- }
- return null;
- }
-
- @Nullable
- public WriterMetrics getWriterMetrics(BinaryRow partition, int bucket) {
- if (this.metricRegistry != null) {
- return new WriterMetrics(
- metricRegistry, tableName, getPartitionString(pathFactory,
partition), bucket);
- }
- return null;
- }
-
- private String getPartitionString(FileStorePathFactory pathFactory,
BinaryRow partition) {
- String partitionStr =
-
pathFactory.getPartitionString(partition).replace(Path.SEPARATOR, "_");
- if (partitionStr.length() > 0) {
- return partitionStr.substring(0, partitionStr.length() - 1);
- }
- return "_";
- }
-
private List<DataFileMeta> scanExistingFileMetas(
long snapshotId, BinaryRow partition, int bucket) {
List<DataFileMeta> existingFileMetas = new ArrayList<>();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
index a76848b5e..283df7b07 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
@@ -85,7 +85,7 @@ public class AppendOnlyFileStoreWrite extends
MemoryFileStoreWrite<InternalRow>
FileStoreScan scan,
CoreOptions options,
String tableName) {
- super(commitUser, snapshotManager, scan, options, null, tableName,
pathFactory);
+ super(commitUser, snapshotManager, scan, options, null, tableName);
this.fileIO = fileIO;
this.read = read;
this.schemaId = schemaId;
@@ -125,7 +125,9 @@ public class AppendOnlyFileStoreWrite extends
MemoryFileStoreWrite<InternalRow>
compactionMaxFileNum,
targetFileSize,
compactRewriter(partition, bucket),
- getCompactionMetrics(partition, bucket));
+ compactionMetrics == null
+ ? null
+ :
compactionMetrics.createReporter(partition, bucket));
return new AppendOnlyWriter(
fileIO,
@@ -142,8 +144,7 @@ public class AppendOnlyFileStoreWrite extends
MemoryFileStoreWrite<InternalRow>
useWriteBuffer || forceBufferSpill,
spillable || forceBufferSpill,
fileCompression,
- statsCollectors,
- getWriterMetrics(partition, bucket));
+ statsCollectors);
}
public AppendOnlyCompactManager.CompactRewriter compactRewriter(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index 6a18a34dc..269ff2dc5 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -110,7 +110,7 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
CoreOptions options,
KeyValueFieldsExtractor extractor,
String tableName) {
- super(commitUser, snapshotManager, scan, options, indexFactory,
tableName, pathFactory);
+ super(commitUser, snapshotManager, scan, options, indexFactory,
tableName);
this.fileIO = fileIO;
this.keyType = keyType;
this.valueType = valueType;
@@ -186,8 +186,7 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
options.commitForceCompact(),
options.changelogProducer(),
restoreIncrement,
- SequenceGenerator.create(schema, options),
- getWriterMetrics(partition, bucket));
+ SequenceGenerator.create(schema, options));
}
@VisibleForTesting
@@ -214,7 +213,9 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
options.compactionFileSize(),
options.numSortedRunStopTrigger(),
rewriter,
- getCompactionMetrics(partition, bucket));
+ compactionMetrics == null
+ ? null
+ : compactionMetrics.createReporter(partition,
bucket));
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java
index b49824f01..9159965d9 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java
@@ -26,7 +26,6 @@ import org.apache.paimon.memory.MemoryOwner;
import org.apache.paimon.memory.MemoryPoolFactory;
import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.operation.metrics.WriterBufferMetric;
-import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.RecordWriter;
import org.apache.paimon.utils.SnapshotManager;
@@ -61,15 +60,13 @@ public abstract class MemoryFileStoreWrite<T> extends
AbstractFileStoreWrite<T>
FileStoreScan scan,
CoreOptions options,
@Nullable IndexMaintainer.Factory<T> indexFactory,
- String tableName,
- FileStorePathFactory pathFactory) {
+ String tableName) {
super(
commitUser,
snapshotManager,
scan,
indexFactory,
tableName,
- pathFactory,
options.writeMaxWritersToSpill());
this.options = options;
this.cacheManager = new CacheManager(options.lookupCacheMaxMemory());
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CompactTimer.java
b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CompactTimer.java
new file mode 100644
index 000000000..47036218f
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CompactTimer.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation.metrics;
+
+import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.utils.Preconditions;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.LinkedList;
+
+/**
+ * A timer which supports the following operations in O(1) amortized time
complexity.
+ *
+ * <ul>
+ * <li>Start the timer.
+ * <li>Stop the timer.
+ * <li>Query how long the timer is running in the last
<code>queryLengthMillis</code>
+ * milliseconds, where <code>queryLengthMillis</code> is a constant.
+ * </ul>
+ */
+@ThreadSafe
+public class CompactTimer {
+
+ private final long queryLengthMillis;
+ private final LinkedList<TimeInterval> intervals;
+ // innerSum is the total length of intervals, except the first and the
last one
+ private long innerSum;
+ private long lastCallMillis;
+
+ public CompactTimer(long queryLengthMillis) {
+ this.queryLengthMillis = queryLengthMillis;
+ this.intervals = new LinkedList<>();
+ this.innerSum = 0;
+ this.lastCallMillis = -1;
+ }
+
+ public void start() {
+ start(System.currentTimeMillis());
+ }
+
+ @VisibleForTesting
+ void start(long millis) {
+ synchronized (intervals) {
+ removeExpiredIntervals(millis - queryLengthMillis);
+ Preconditions.checkArgument(
+ intervals.isEmpty() || intervals.getLast().finished(),
+ "There is an unfinished interval. This is unexpected.");
+ Preconditions.checkArgument(lastCallMillis <= millis, "millis must
not decrease.");
+ lastCallMillis = millis;
+
+ if (intervals.size() > 1) {
+ innerSum += intervals.getLast().totalLength();
+ }
+ intervals.add(new TimeInterval(millis));
+ }
+ }
+
+ public void finish() {
+ finish(System.currentTimeMillis());
+ }
+
+ @VisibleForTesting
+ void finish(long millis) {
+ synchronized (intervals) {
+ removeExpiredIntervals(millis - queryLengthMillis);
+ Preconditions.checkArgument(
+ intervals.size() > 0 && !intervals.getLast().finished(),
+ "There is no unfinished interval. This is unexpected.");
+ Preconditions.checkArgument(lastCallMillis <= millis, "millis must
not decrease.");
+ lastCallMillis = millis;
+
+ intervals.getLast().finish(millis);
+ }
+ }
+
+ public long calculateLength() {
+ return calculateLength(System.currentTimeMillis());
+ }
+
+ @VisibleForTesting
+ long calculateLength(long toMillis) {
+ synchronized (intervals) {
+ Preconditions.checkArgument(lastCallMillis <= toMillis, "millis
must not decrease.");
+ lastCallMillis = toMillis;
+
+ long fromMillis = toMillis - queryLengthMillis;
+ removeExpiredIntervals(fromMillis);
+
+ if (intervals.isEmpty()) {
+ return 0;
+ } else if (intervals.size() == 1) {
+ return intervals.getFirst().calculateLength(fromMillis,
toMillis);
+ } else {
+ // only the first and the last interval may not be complete,
+ // so we calculate them separately
+ return innerSum
+ + intervals.getFirst().calculateLength(fromMillis,
toMillis)
+ + intervals.getLast().calculateLength(fromMillis,
toMillis);
+ }
+ }
+ }
+
+ private void removeExpiredIntervals(long expireMillis) {
+ while (intervals.size() > 0
+ && intervals.getFirst().finished()
+ && intervals.getFirst().finishMillis <= expireMillis) {
+ intervals.removeFirst();
+ if (intervals.size() > 1) {
+ innerSum -= intervals.getFirst().totalLength();
+ }
+ }
+ }
+
+ private static class TimeInterval {
+
+ private final long startMillis;
+ private Long finishMillis;
+
+ private TimeInterval(long startMillis) {
+ this.startMillis = startMillis;
+ this.finishMillis = null;
+ }
+
+ private void finish(long finishMillis) {
+ this.finishMillis = finishMillis;
+ }
+
+ private boolean finished() {
+ return finishMillis != null;
+ }
+
+ private long totalLength() {
+ return finishMillis - startMillis;
+ }
+
+ private long calculateLength(long fromMillis, long toMillis) {
+ if (finishMillis == null) {
+ return toMillis - Math.min(Math.max(startMillis, fromMillis),
toMillis);
+ } else {
+ long l = Math.max(fromMillis, startMillis);
+ long r = Math.min(toMillis, finishMillis);
+ return Math.max(0, r - l);
+ }
+ }
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CompactionMetrics.java
b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CompactionMetrics.java
index eca2053d4..4ca54a6c3 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CompactionMetrics.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CompactionMetrics.java
@@ -19,21 +19,36 @@
package org.apache.paimon.operation.metrics;
import org.apache.paimon.annotation.VisibleForTesting;
-import org.apache.paimon.metrics.Histogram;
+import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.metrics.MetricGroup;
import org.apache.paimon.metrics.MetricRegistry;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.DoubleStream;
+import java.util.stream.LongStream;
+
/** Metrics to measure a compaction. */
public class CompactionMetrics {
- private static final int HISTOGRAM_WINDOW_SIZE = 100;
private static final String GROUP_NAME = "compaction";
+ public static final String MAX_LEVEL0_FILE_COUNT = "maxLevel0FileCount";
+ public static final String AVG_LEVEL0_FILE_COUNT = "avgLevel0FileCount";
+ public static final String COMPACTION_THREAD_BUSY = "compactionThreadBusy";
+ private static final long BUSY_MEASURE_MILLIS = 60_000;
+
private final MetricGroup metricGroup;
+ private final Map<PartitionAndBucket, ReporterImpl> reporters;
+ private final Map<Long, CompactTimer> compactTimers;
+
+ public CompactionMetrics(MetricRegistry registry, String tableName) {
+ this.metricGroup = registry.tableMetricGroup(GROUP_NAME, tableName);
+ this.reporters = new HashMap<>();
+ this.compactTimers = new ConcurrentHashMap<>();
- public CompactionMetrics(
- MetricRegistry registry, String tableName, String partition, int
bucket) {
- this.metricGroup = registry.bucketMetricGroup(GROUP_NAME, tableName,
partition, bucket);
registerGenericCompactionMetrics();
}
@@ -42,78 +57,97 @@ public class CompactionMetrics {
return metricGroup;
}
- private Histogram durationHistogram;
- private CompactionStats latestCompaction;
- private long level0FileCount = -1;
+ private void registerGenericCompactionMetrics() {
+ metricGroup.gauge(MAX_LEVEL0_FILE_COUNT, () ->
getLevel0FileCountStream().max().orElse(-1));
+ metricGroup.gauge(
+ AVG_LEVEL0_FILE_COUNT, () ->
getLevel0FileCountStream().average().orElse(-1));
- @VisibleForTesting static final String LAST_COMPACTION_DURATION =
"lastCompactionDuration";
- @VisibleForTesting static final String COMPACTION_DURATION =
"compactionDuration";
+ metricGroup.gauge(COMPACTION_THREAD_BUSY, () ->
getCompactBusyStream().sum());
+ }
- @VisibleForTesting
- static final String LAST_TABLE_FILES_COMPACTED_BEFORE =
"lastTableFilesCompactedBefore";
+ private LongStream getLevel0FileCountStream() {
+ return reporters.values().stream().mapToLong(r -> r.level0FileCount);
+ }
- @VisibleForTesting
- static final String LAST_TABLE_FILES_COMPACTED_AFTER =
"lastTableFilesCompactedAfter";
+ private DoubleStream getCompactBusyStream() {
+ return compactTimers.values().stream()
+ .mapToDouble(t -> 100.0 * t.calculateLength() /
BUSY_MEASURE_MILLIS);
+ }
- @VisibleForTesting
- static final String LAST_CHANGELOG_FILES_COMPACTED =
"lastChangelogFilesCompacted";
+ public void close() {
+ metricGroup.close();
+ }
- @VisibleForTesting
- static final String LAST_REWRITE_INPUT_FILE_SIZE =
"lastRewriteInputFileSize";
+ /** Report metrics value to the {@link CompactionMetrics} object. */
+ public interface Reporter {
- @VisibleForTesting
- static final String LAST_REWRITE_OUTPUT_FILE_SIZE =
"lastRewriteOutputFileSize";
+ CompactTimer getCompactTimer();
- @VisibleForTesting
- static final String LAST_REWRITE_CHANGELOG_FILE_SIZE =
"lastRewriteChangelogFileSize";
+ void reportLevel0FileCount(long count);
- @VisibleForTesting static final String LEVEL_0_FILE_COUNT =
"level0FileCount";
-
- private void registerGenericCompactionMetrics() {
- metricGroup.gauge(
- LAST_COMPACTION_DURATION,
- () -> latestCompaction == null ? 0L :
latestCompaction.getDuration());
- durationHistogram = metricGroup.histogram(COMPACTION_DURATION,
HISTOGRAM_WINDOW_SIZE);
- metricGroup.gauge(
- LAST_TABLE_FILES_COMPACTED_BEFORE,
- () ->
- latestCompaction == null
- ? 0L
- :
latestCompaction.getCompactedDataFilesBefore());
- metricGroup.gauge(
- LAST_TABLE_FILES_COMPACTED_AFTER,
- () ->
- latestCompaction == null
- ? 0L
- :
latestCompaction.getCompactedDataFilesAfter());
- metricGroup.gauge(
- LAST_CHANGELOG_FILES_COMPACTED,
- () -> latestCompaction == null ? 0L :
latestCompaction.getCompactedChangelogs());
- metricGroup.gauge(
- LAST_REWRITE_INPUT_FILE_SIZE,
- () -> latestCompaction == null ? 0L :
latestCompaction.getRewriteInputFileSize());
- metricGroup.gauge(
- LAST_REWRITE_OUTPUT_FILE_SIZE,
- () -> latestCompaction == null ? 0L :
latestCompaction.getRewriteOutputFileSize());
- metricGroup.gauge(
- LAST_REWRITE_CHANGELOG_FILE_SIZE,
- () ->
- latestCompaction == null
- ? 0L
- :
latestCompaction.getRewriteChangelogFileSize());
- metricGroup.gauge(LEVEL_0_FILE_COUNT, () -> level0FileCount);
+ void unregister();
}
- public void reportCompaction(CompactionStats compactionStats) {
- latestCompaction = compactionStats;
- durationHistogram.update(compactionStats.getDuration());
+ private class ReporterImpl implements Reporter {
+
+ private final PartitionAndBucket key;
+ private long level0FileCount;
+
+ private ReporterImpl(PartitionAndBucket key) {
+ this.key = key;
+ this.level0FileCount = 0;
+ }
+
+ @Override
+ public CompactTimer getCompactTimer() {
+ return compactTimers.computeIfAbsent(
+ Thread.currentThread().getId(),
+ ignore -> new CompactTimer(BUSY_MEASURE_MILLIS));
+ }
+
+ @Override
+ public void reportLevel0FileCount(long count) {
+ this.level0FileCount = count;
+ }
+
+ @Override
+ public void unregister() {
+ reporters.remove(key);
+ }
}
- public void reportLevel0FileCount(long count) {
- this.level0FileCount = count;
+ public Reporter createReporter(BinaryRow partition, int bucket) {
+ PartitionAndBucket key = new PartitionAndBucket(partition, bucket);
+ ReporterImpl reporter = new ReporterImpl(key);
+ reporters.put(key, reporter);
+ return reporter;
}
- public void close() {
- metricGroup.close();
+ private static class PartitionAndBucket {
+
+ private final BinaryRow partition;
+ private final int bucket;
+
+ private PartitionAndBucket(BinaryRow partition, int bucket) {
+ this.partition = partition;
+ this.bucket = bucket;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(partition, bucket);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof PartitionAndBucket)) {
+ return false;
+ }
+ PartitionAndBucket other = (PartitionAndBucket) o;
+ return Objects.equals(partition, other.partition) && bucket ==
other.bucket;
+ }
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CompactionStats.java
b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CompactionStats.java
deleted file mode 100644
index ba71a2dad..000000000
---
a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CompactionStats.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.operation.metrics;
-
-import org.apache.paimon.annotation.VisibleForTesting;
-import org.apache.paimon.io.DataFileMeta;
-
-import java.util.List;
-
-/** Statistics for a compaction. */
-public class CompactionStats {
- private final long duration;
- private final long compactedDataFilesBefore;
- private final long compactedDataFilesAfter;
- private final long compactedChangelogs;
- private final long rewriteInputFileSize;
- private final long rewriteOutputFileSize;
- private final long rewriteChangelogFileSize;
-
- public CompactionStats(
- long compactionDuration,
- List<DataFileMeta> compactBefore,
- List<DataFileMeta> compactAfter,
- List<DataFileMeta> compactChangelog) {
- this.duration = compactionDuration;
- this.compactedDataFilesBefore = compactBefore.size();
- this.compactedDataFilesAfter = compactAfter.size();
- this.compactedChangelogs = compactChangelog.size();
- this.rewriteInputFileSize = rewriteFileSize(compactBefore);
- this.rewriteOutputFileSize = rewriteFileSize(compactAfter);
- this.rewriteChangelogFileSize = rewriteFileSize(compactChangelog);
- }
-
- @VisibleForTesting
- protected long getDuration() {
- return duration;
- }
-
- protected long getCompactedDataFilesBefore() {
- return compactedDataFilesBefore;
- }
-
- protected long getCompactedDataFilesAfter() {
- return compactedDataFilesAfter;
- }
-
- protected long getCompactedChangelogs() {
- return compactedChangelogs;
- }
-
- protected long getRewriteInputFileSize() {
- return rewriteInputFileSize;
- }
-
- protected long getRewriteOutputFileSize() {
- return rewriteOutputFileSize;
- }
-
- protected long getRewriteChangelogFileSize() {
- return rewriteChangelogFileSize;
- }
-
- private long rewriteFileSize(List<DataFileMeta> files) {
- return files.stream().mapToLong(DataFileMeta::fileSize).sum();
- }
-
- @Override
- public String toString() {
- return "CompactionStats{"
- + "duration="
- + duration
- + ", compactedDataFilesBefore="
- + compactedDataFilesBefore
- + ", compactedDataFilesAfter="
- + compactedDataFilesAfter
- + ", compactedChangelogs="
- + compactedChangelogs
- + ", rewriteInputFileSize="
- + rewriteInputFileSize
- + ", rewriteOutputFileSize="
- + rewriteOutputFileSize
- + ", rewriteChangelogFileSize="
- + rewriteChangelogFileSize
- + '}';
- }
-}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/MetricUtils.java
b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/MetricUtils.java
new file mode 100644
index 000000000..cc49d5a62
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/MetricUtils.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation.metrics;
+
+import org.slf4j.Logger;
+
+/** Utils for metrics. */
+public class MetricUtils {
+
+ public static void safeCall(Runnable runnable, Logger logger) {
+ try {
+ runnable.run();
+ } catch (Throwable t) {
+ logger.warn("Exception occurs when reporting metrics", t);
+ }
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/WriterBufferMetric.java
b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/WriterBufferMetric.java
index 3a0c2e87b..a89531bc4 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/WriterBufferMetric.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/WriterBufferMetric.java
@@ -33,7 +33,7 @@ public class WriterBufferMetric {
private static final String USED_WRITE_BUFFER_SIZE =
"usedWriteBufferSizeByte";
private static final String TOTAL_WRITE_BUFFER_SIZE =
"totalWriteBufferSizeByte";
- private MetricGroup metricGroup;
+ private final MetricGroup metricGroup;
public WriterBufferMetric(
Supplier<MemoryPoolFactory> memoryPoolFactorySupplier,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/WriterMetrics.java
b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/WriterMetrics.java
deleted file mode 100644
index 2bd0b940b..000000000
---
a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/WriterMetrics.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.operation.metrics;
-
-import org.apache.paimon.metrics.Counter;
-import org.apache.paimon.metrics.Histogram;
-import org.apache.paimon.metrics.MetricGroup;
-import org.apache.paimon.metrics.MetricRegistry;
-
-/** Metrics for writer. */
-public class WriterMetrics {
-
- private static final String GROUP_NAME = "writer";
-
- private static final int WINDOW_SAMPLE_SIZE = 100;
-
- private static final String WRITE_RECORD_NUM = "writeRecordCount";
- private static final String FLUSH_COST_MILLIS = "flushCostMillis";
- public static final String PREPARE_COMMIT_COST_MILLIS =
"prepareCommitCostMillis";
-
- private final Counter writeRecordNumCounter;
-
- private final Histogram bufferFlushCostMillis;
-
- private final Histogram prepareCommitCostMillis;
-
- private final MetricGroup metricGroup;
-
- public WriterMetrics(MetricRegistry registry, String tableName, String
partition, int bucket) {
- metricGroup = registry.bucketMetricGroup(GROUP_NAME, tableName,
partition, bucket);
- writeRecordNumCounter = metricGroup.counter(WRITE_RECORD_NUM);
- bufferFlushCostMillis = metricGroup.histogram(FLUSH_COST_MILLIS,
WINDOW_SAMPLE_SIZE);
- prepareCommitCostMillis =
- metricGroup.histogram(PREPARE_COMMIT_COST_MILLIS,
WINDOW_SAMPLE_SIZE);
- }
-
- public void incWriteRecordNum() {
- writeRecordNumCounter.inc();
- }
-
- public void updateBufferFlushCostMillis(long bufferFlushCost) {
- bufferFlushCostMillis.update(bufferFlushCost);
- }
-
- public void updatePrepareCommitCostMillis(long cost) {
- this.prepareCommitCostMillis.update(cost);
- }
-
- public void close() {
- metricGroup.close();
- }
-}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
index 5ab6b8537..c954aec1f 100644
---
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
@@ -602,8 +602,7 @@ public class AppendOnlyWriterTest {
spillable,
CoreOptions.FILE_COMPRESSION.defaultValue(),
StatsCollectorFactories.createStatsFactories(
- options,
AppendOnlyWriterTest.SCHEMA.getFieldNames()),
- null);
+ options,
AppendOnlyWriterTest.SCHEMA.getFieldNames()));
writer.setMemoryPool(
new HeapMemorySegmentPool(options.writeBufferSize(),
options.pageSize()));
return Pair.of(writer, compactManager.allFiles());
diff --git
a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
index 219609ffb..dcc02cde6 100644
---
a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
@@ -85,8 +85,7 @@ public class FileFormatSuffixTest extends
KeyValueFileReadWriteTest {
false,
CoreOptions.FILE_COMPRESSION.defaultValue(),
StatsCollectorFactories.createStatsFactories(
- options, SCHEMA.getFieldNames()),
- null);
+ options, SCHEMA.getFieldNames()));
appendOnlyWriter.setMemoryPool(
new HeapMemorySegmentPool(options.writeBufferSize(),
options.pageSize()));
appendOnlyWriter.write(
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
index d3cfc2af7..1aa708306 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
@@ -426,7 +426,6 @@ public abstract class MergeTreeTestBase {
options.commitForceCompact(),
ChangelogProducer.NONE,
null,
- null,
null);
writer.setMemoryPool(
new HeapMemorySegmentPool(options.writeBufferSize(),
options.pageSize()));
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/metrics/CompactTimerTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/metrics/CompactTimerTest.java
new file mode 100644
index 000000000..44b15715b
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/metrics/CompactTimerTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation.metrics;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link CompactTimer}. */
+public class CompactTimerTest {
+
+ @Test
+ public void testRandom() {
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ int totalLength = random.nextInt(1, 1000);
+ int queryLength;
+ if (random.nextBoolean() && totalLength > 100) {
+ queryLength = random.nextInt(1, 20);
+ } else {
+ queryLength = random.nextInt(1, totalLength + 1);
+ }
+ boolean[] running = new boolean[totalLength];
+ CompactTimer timer = new CompactTimer(queryLength);
+
+ boolean now = false;
+ for (int i = 0; i < totalLength; i++) {
+ if (random.nextInt(10) == 0) {
+ // change state
+ now = !now;
+ if (now) {
+ timer.start(i);
+ } else {
+ timer.finish(i);
+ }
+ }
+ running[i] = now;
+
+ if (random.nextInt(10) == 0) {
+ // query
+ int expected = 0;
+ for (int j = 1; j <= queryLength; j++) {
+ if (i - j < 0) {
+ break;
+ }
+ if (running[i - j]) {
+ expected += 1;
+ }
+ }
+ assertThat(timer.calculateLength(i)).isEqualTo(expected);
+ }
+ }
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/metrics/CompactionMetricsTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/metrics/CompactionMetricsTest.java
index 507a2b4fe..b18250c9e 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/metrics/CompactionMetricsTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/metrics/CompactionMetricsTest.java
@@ -18,166 +18,45 @@
package org.apache.paimon.operation.metrics;
-import org.apache.paimon.io.DataFileMeta;
-import org.apache.paimon.io.DataFileTestUtils;
+import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.metrics.Gauge;
-import org.apache.paimon.metrics.Histogram;
-import org.apache.paimon.metrics.Metric;
import org.apache.paimon.metrics.MetricRegistryImpl;
import org.junit.jupiter.api.Test;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.offset;
/** Tests for {@link CompactionMetrics}. */
public class CompactionMetricsTest {
- private static final String TABLE_NAME = "myTable";
- private static final String PARTITION = "date=20230623";
- private static final int BUCKET = 5;
-
- /** Tests that the metrics are updated properly. */
- @SuppressWarnings("unchecked")
@Test
- public void testMetricsAreUpdated() {
- CompactionMetrics compactionMetrics = getCompactionMetrics();
- Map<String, Metric> registeredGenericMetrics =
- compactionMetrics.getMetricGroup().getMetrics();
-
- // Check initial values
- Gauge<Long> lastCompactionDuration =
- (Gauge<Long>)
-
registeredGenericMetrics.get(CompactionMetrics.LAST_COMPACTION_DURATION);
- Histogram compactionDuration =
- (Histogram)
registeredGenericMetrics.get(CompactionMetrics.COMPACTION_DURATION);
- Gauge<Long> lastTableFilesCompactedBefore =
- (Gauge<Long>)
- registeredGenericMetrics.get(
-
CompactionMetrics.LAST_TABLE_FILES_COMPACTED_BEFORE);
- Gauge<Long> lastTableFilesCompactedAfter =
- (Gauge<Long>)
- registeredGenericMetrics.get(
-
CompactionMetrics.LAST_TABLE_FILES_COMPACTED_AFTER);
- Gauge<Long> lastChangelogFilesCompacted =
- (Gauge<Long>)
- registeredGenericMetrics.get(
-
CompactionMetrics.LAST_CHANGELOG_FILES_COMPACTED);
- Gauge<Long> lastRewriteInputFileSize =
- (Gauge<Long>)
- registeredGenericMetrics.get(
-
CompactionMetrics.LAST_REWRITE_INPUT_FILE_SIZE);
- Gauge<Long> lastRewriteOutputFileSize =
- (Gauge<Long>)
- registeredGenericMetrics.get(
-
CompactionMetrics.LAST_REWRITE_OUTPUT_FILE_SIZE);
- Gauge<Long> lastRewriteChangelogFileSize =
- (Gauge<Long>)
- registeredGenericMetrics.get(
-
CompactionMetrics.LAST_REWRITE_CHANGELOG_FILE_SIZE);
- Gauge<Long> level0FileCount =
- (Gauge<Long>)
registeredGenericMetrics.get(CompactionMetrics.LEVEL_0_FILE_COUNT);
-
- assertThat(lastCompactionDuration.getValue()).isEqualTo(0);
- assertThat(compactionDuration.getCount()).isEqualTo(0);
- assertThat(compactionDuration.getStatistics().size()).isEqualTo(0);
- assertThat(lastTableFilesCompactedBefore.getValue()).isEqualTo(0);
- assertThat(lastTableFilesCompactedAfter.getValue()).isEqualTo(0);
- assertThat(lastChangelogFilesCompacted.getValue()).isEqualTo(0);
- assertThat(lastRewriteInputFileSize.getValue()).isEqualTo(0);
- assertThat(lastRewriteOutputFileSize.getValue()).isEqualTo(0);
- assertThat(lastRewriteChangelogFileSize.getValue()).isEqualTo(0);
-
- // report once
- reportOnce(compactionMetrics);
-
- // generic metrics value updated
- assertThat(lastCompactionDuration.getValue()).isEqualTo(3000);
- assertThat(compactionDuration.getCount()).isEqualTo(1);
- assertThat(compactionDuration.getStatistics().size()).isEqualTo(1);
-
assertThat(compactionDuration.getStatistics().getValues()[0]).isEqualTo(3000L);
-
assertThat(compactionDuration.getStatistics().getMin()).isEqualTo(3000);
- assertThat(compactionDuration.getStatistics().getQuantile(0.5))
- .isCloseTo(3000.0, offset(0.001));
-
assertThat(compactionDuration.getStatistics().getMean()).isEqualTo(3000);
-
assertThat(compactionDuration.getStatistics().getMax()).isEqualTo(3000);
-
assertThat(compactionDuration.getStatistics().getStdDev()).isEqualTo(0);
- assertThat(lastTableFilesCompactedBefore.getValue()).isEqualTo(2);
- assertThat(lastTableFilesCompactedAfter.getValue()).isEqualTo(2);
- assertThat(lastChangelogFilesCompacted.getValue()).isEqualTo(2);
- assertThat(lastRewriteInputFileSize.getValue()).isEqualTo(2001);
- assertThat(lastRewriteOutputFileSize.getValue()).isEqualTo(1101);
- assertThat(lastRewriteChangelogFileSize.getValue()).isEqualTo(3001);
-
- // report again
- reportAgain(compactionMetrics);
-
- // generic metrics value updated
- assertThat(lastCompactionDuration.getValue()).isEqualTo(6000);
- assertThat(compactionDuration.getCount()).isEqualTo(2);
- assertThat(compactionDuration.getStatistics().size()).isEqualTo(2);
-
assertThat(compactionDuration.getStatistics().getValues()[1]).isEqualTo(6000L);
-
assertThat(compactionDuration.getStatistics().getMin()).isEqualTo(3000);
- assertThat(compactionDuration.getStatistics().getQuantile(0.5))
- .isCloseTo(4500, offset(0.001));
-
assertThat(compactionDuration.getStatistics().getMean()).isEqualTo(4500);
-
assertThat(compactionDuration.getStatistics().getMax()).isEqualTo(6000);
- assertThat(compactionDuration.getStatistics().getStdDev())
- .isCloseTo(2121.320, offset(0.001));
- assertThat(lastTableFilesCompactedBefore.getValue()).isEqualTo(2);
- assertThat(lastTableFilesCompactedAfter.getValue()).isEqualTo(2);
- assertThat(lastChangelogFilesCompacted.getValue()).isEqualTo(2);
- assertThat(lastRewriteInputFileSize.getValue()).isEqualTo(2001);
- assertThat(lastRewriteOutputFileSize.getValue()).isEqualTo(1201);
- assertThat(lastRewriteChangelogFileSize.getValue()).isEqualTo(2501);
-
- // test level0FileCount
- compactionMetrics.reportLevel0FileCount(10);
- assertThat(level0FileCount.getValue()).isEqualTo(10);
-
- compactionMetrics.reportLevel0FileCount(20);
- assertThat(level0FileCount.getValue()).isEqualTo(20);
- }
-
- private void reportOnce(CompactionMetrics compactionMetrics) {
- List<DataFileMeta> compactBefore = new ArrayList<>();
- List<DataFileMeta> compactAfter = new ArrayList<>();
- List<DataFileMeta> compactChangelog = new ArrayList<>();
- compactBefore.add(DataFileTestUtils.newFile(0, 1000));
- compactBefore.add(DataFileTestUtils.newFile(1001, 2000));
- compactAfter.add(DataFileTestUtils.newFile(400, 1000));
- compactAfter.add(DataFileTestUtils.newFile(1001, 1500));
- compactChangelog.add(DataFileTestUtils.newFile(0, 2000));
- compactChangelog.add(DataFileTestUtils.newFile(2001, 3000));
-
- CompactionStats compactionStats =
- new CompactionStats(3000, compactBefore, compactAfter,
compactChangelog);
-
- compactionMetrics.reportCompaction(compactionStats);
- }
-
- private void reportAgain(CompactionMetrics compactionMetrics) {
- List<DataFileMeta> compactBefore = new ArrayList<>();
- List<DataFileMeta> compactAfter = new ArrayList<>();
- List<DataFileMeta> compactChangelog = new ArrayList<>();
- compactBefore.add(DataFileTestUtils.newFile(2000, 3000));
- compactBefore.add(DataFileTestUtils.newFile(3001, 4000));
- compactAfter.add(DataFileTestUtils.newFile(600, 1200));
- compactAfter.add(DataFileTestUtils.newFile(1201, 1800));
- compactChangelog.add(DataFileTestUtils.newFile(0, 1500));
- compactChangelog.add(DataFileTestUtils.newFile(1501, 2500));
-
- CompactionStats compactionStats =
- new CompactionStats(6000, compactBefore, compactAfter,
compactChangelog);
-
- compactionMetrics.reportCompaction(compactionStats);
+ public void testReportMetrics() {
+ CompactionMetrics metrics = new CompactionMetrics(new
MetricRegistryImpl(), "myTable");
+ assertThat(getMetric(metrics,
CompactionMetrics.MAX_LEVEL0_FILE_COUNT)).isEqualTo(-1L);
+ assertThat(getMetric(metrics,
CompactionMetrics.AVG_LEVEL0_FILE_COUNT)).isEqualTo(-1.0);
+ assertThat(getMetric(metrics,
CompactionMetrics.COMPACTION_THREAD_BUSY)).isEqualTo(0.0);
+
+ CompactionMetrics.Reporter[] reporters = new
CompactionMetrics.Reporter[3];
+ for (int i = 0; i < reporters.length; i++) {
+ reporters[i] = metrics.createReporter(BinaryRow.EMPTY_ROW, i);
+ }
+
+ assertThat(getMetric(metrics,
CompactionMetrics.MAX_LEVEL0_FILE_COUNT)).isEqualTo(0L);
+ assertThat(getMetric(metrics,
CompactionMetrics.AVG_LEVEL0_FILE_COUNT)).isEqualTo(0.0);
+ assertThat(getMetric(metrics,
CompactionMetrics.COMPACTION_THREAD_BUSY)).isEqualTo(0.0);
+
+ reporters[0].reportLevel0FileCount(5);
+ reporters[1].reportLevel0FileCount(3);
+ reporters[2].reportLevel0FileCount(4);
+ assertThat(getMetric(metrics,
CompactionMetrics.MAX_LEVEL0_FILE_COUNT)).isEqualTo(5L);
+ assertThat(getMetric(metrics,
CompactionMetrics.AVG_LEVEL0_FILE_COUNT)).isEqualTo(4.0);
+
+ reporters[0].reportLevel0FileCount(8);
+ assertThat(getMetric(metrics,
CompactionMetrics.MAX_LEVEL0_FILE_COUNT)).isEqualTo(8L);
+ assertThat(getMetric(metrics,
CompactionMetrics.AVG_LEVEL0_FILE_COUNT)).isEqualTo(5.0);
}
- private CompactionMetrics getCompactionMetrics() {
- return new CompactionMetrics(new MetricRegistryImpl(), TABLE_NAME,
PARTITION, BUCKET);
+ private Object getMetric(CompactionMetrics metrics, String metricName) {
+ return ((Gauge<?>)
metrics.getMetricGroup().getMetrics().get(metricName)).getValue();
}
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketWriteOperatorTest.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketWriteOperatorTest.java
deleted file mode 100644
index c72a1f637..000000000
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketWriteOperatorTest.java
+++ /dev/null
@@ -1,193 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.flink.sink.cdc;
-
-import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.flink.sink.Committable;
-import org.apache.paimon.flink.sink.CommittableTypeInfo;
-import org.apache.paimon.flink.sink.StoreSinkWriteImpl;
-import org.apache.paimon.flink.utils.MetricUtils;
-import org.apache.paimon.fs.Path;
-import org.apache.paimon.fs.local.LocalFileIO;
-import org.apache.paimon.options.Options;
-import org.apache.paimon.schema.Schema;
-import org.apache.paimon.schema.SchemaManager;
-import org.apache.paimon.schema.SchemaUtils;
-import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.FileStoreTableFactory;
-import org.apache.paimon.types.DataType;
-import org.apache.paimon.types.DataTypes;
-import org.apache.paimon.types.RowKind;
-import org.apache.paimon.types.RowType;
-import org.apache.paimon.utils.TraceableFileIO;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.runtime.state.JavaSerializer;
-import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.io.TempDir;
-
-import java.time.Duration;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.function.Predicate;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-/** Tests for {@link CdcRecordStoreWriteOperator}. */
-public class CdcDynamicBucketWriteOperatorTest {
-
- @TempDir java.nio.file.Path tempDir;
-
- private Path tablePath;
- private String commitUser;
-
- @BeforeEach
- public void before() {
- tablePath = new Path(TraceableFileIO.SCHEME + "://" +
tempDir.toString());
- commitUser = UUID.randomUUID().toString();
- }
-
- @AfterEach
- public void after() {
- // assert all connections are closed
- Predicate<Path> pathPredicate = path ->
path.toString().contains(tempDir.toString());
- assertThat(TraceableFileIO.openInputStreams(pathPredicate)).isEmpty();
- assertThat(TraceableFileIO.openOutputStreams(pathPredicate)).isEmpty();
- }
-
- @Test
- public void testCompactionMetrics() throws Exception {
- RowType rowType =
- RowType.of(
- new DataType[] {DataTypes.INT(), DataTypes.INT()},
- new String[] {"pk", "col1"});
- FileStoreTable table =
- createFileStoreTable(
- rowType, Collections.emptyList(),
Collections.singletonList("pk"));
- OneInputStreamOperatorTestHarness<Tuple2<CdcRecord, Integer>,
Committable> harness =
- createTestHarness(table);
- CdcDynamicBucketWriteOperator operator =
- (CdcDynamicBucketWriteOperator) harness.getOneInputOperator();
- harness.open();
-
- MetricGroup compactionMetricGroup =
- operator.getMetricGroup()
- .addGroup("paimon")
- .addGroup("table", table.name())
- .addGroup("partition", "_")
- .addGroup("bucket", "0")
- .addGroup("compaction");
-
- long timestamp = 0;
- long cpId = 1L;
- Map<String, String> fields = new HashMap<>();
- fields.put("pk", "1");
- fields.put("col1", "2");
- harness.processElement(Tuple2.of(new CdcRecord(RowKind.INSERT,
fields), 0), timestamp++);
- operator.getWrite().compact(BinaryRow.EMPTY_ROW, 0, true);
- operator.getWrite().prepareCommit(true, cpId++);
- assertThat(
- MetricUtils.getGauge(compactionMetricGroup,
"lastTableFilesCompactedBefore")
- .getValue())
- .isEqualTo(1L);
- assertThat(
- MetricUtils.getGauge(compactionMetricGroup,
"lastTableFilesCompactedAfter")
- .getValue())
- .isEqualTo(1L);
- assertThat(
- MetricUtils.getGauge(compactionMetricGroup,
"lastChangelogFilesCompacted")
- .getValue())
- .isEqualTo(0L);
-
- fields.put("pk", "1");
- fields.put("col1", "3");
- harness.processElement(Tuple2.of(new CdcRecord(RowKind.INSERT,
fields), 0), timestamp);
- operator.getWrite().compact(BinaryRow.EMPTY_ROW, 0, true);
- operator.getWrite().prepareCommit(true, cpId);
- assertThat(
- MetricUtils.getGauge(compactionMetricGroup,
"lastTableFilesCompactedBefore")
- .getValue())
- .isEqualTo(2L);
- assertThat(
- MetricUtils.getGauge(compactionMetricGroup,
"lastTableFilesCompactedAfter")
- .getValue())
- .isEqualTo(1L);
- assertThat(
- MetricUtils.getGauge(compactionMetricGroup,
"lastChangelogFilesCompacted")
- .getValue())
- .isEqualTo(0L);
-
- // operator closed, metric groups should be unregistered
- harness.close();
- assertThat(MetricUtils.getGauge(compactionMetricGroup,
"lastTableFilesCompactedBefore"))
- .isNull();
- assertThat(MetricUtils.getGauge(compactionMetricGroup,
"lastTableFilesCompactedAfter"))
- .isNull();
- assertThat(MetricUtils.getGauge(compactionMetricGroup,
"lastChangelogFilesCompacted"))
- .isNull();
- }
-
- private OneInputStreamOperatorTestHarness<Tuple2<CdcRecord, Integer>,
Committable>
- createTestHarness(FileStoreTable table) throws Exception {
- CdcDynamicBucketWriteOperator operator =
- new CdcDynamicBucketWriteOperator(
- table,
- (t, commitUser, state, ioManager, memoryPool,
metricGroup) ->
- new StoreSinkWriteImpl(
- t,
- commitUser,
- state,
- ioManager,
- false,
- false,
- true,
- memoryPool,
- metricGroup),
- commitUser);
- TypeSerializer<Tuple2<CdcRecord, Integer>> inputSerializer = new
JavaSerializer<>();
- TypeSerializer<Committable> outputSerializer =
- new CommittableTypeInfo().createSerializer(new
ExecutionConfig());
- OneInputStreamOperatorTestHarness<Tuple2<CdcRecord, Integer>,
Committable> harness =
- new OneInputStreamOperatorTestHarness<>(operator,
inputSerializer);
- harness.setup(outputSerializer);
- return harness;
- }
-
- private FileStoreTable createFileStoreTable(
- RowType rowType, List<String> partitions, List<String>
primaryKeys) throws Exception {
- Options conf = new Options();
- conf.set(CdcRecordStoreWriteOperator.RETRY_SLEEP_TIME,
Duration.ofMillis(10));
-
- TableSchema tableSchema =
- SchemaUtils.forceCommit(
- new SchemaManager(LocalFileIO.create(), tablePath),
- new Schema(rowType.getFields(), partitions,
primaryKeys, conf.toMap(), ""));
- return FileStoreTableFactory.create(LocalFileIO.create(), tablePath,
tableSchema);
- }
-}
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
index ff4132103..08d5fa72c 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
@@ -27,7 +27,6 @@ import org.apache.paimon.flink.sink.MultiTableCommittable;
import org.apache.paimon.flink.sink.MultiTableCommittableTypeInfo;
import org.apache.paimon.flink.sink.StoreSinkWrite;
import org.apache.paimon.flink.sink.StoreSinkWriteImpl;
-import org.apache.paimon.flink.utils.MetricUtils;
import org.apache.paimon.fs.Path;
import org.apache.paimon.operation.AbstractFileStoreWrite;
import org.apache.paimon.options.CatalogOptions;
@@ -46,7 +45,6 @@ import org.apache.paimon.utils.TraceableFileIO;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.state.JavaSerializer;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
@@ -71,7 +69,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
-import static org.apache.paimon.io.DataFileTestUtils.row;
import static org.assertj.core.api.Assertions.assertThat;
/** Tests for {@link CdcRecordStoreMultiWriteOperator}. */
@@ -710,91 +707,6 @@ public class CdcRecordStoreMultiWriteOperatorTest {
harness.close();
}
- @Test
- public void testSingleTableCompactionMetrics() throws Exception {
- Identifier tableId = firstTable;
- FileStoreTable table = (FileStoreTable) catalog.getTable(tableId);
-
- OneInputStreamOperatorTestHarness<CdcMultiplexRecord,
MultiTableCommittable> testHarness =
- createTestHarness(catalogLoader);
-
- testHarness.open();
-
- CdcRecordStoreMultiWriteOperator operator =
- (CdcRecordStoreMultiWriteOperator)
testHarness.getOneInputOperator();
-
- MetricGroup compactionMetricGroup =
- operator.getMetricGroup()
- .addGroup("paimon")
- .addGroup("table", table.name())
- .addGroup("partition", "pt=0")
- .addGroup("bucket", "0")
- .addGroup("compaction");
-
- long cpId = 1L;
- Map<String, String> fields = new HashMap<>();
- fields.put("pt", "0");
- fields.put("k", "1");
- fields.put("v", "10");
-
- CdcMultiplexRecord record =
- CdcMultiplexRecord.fromCdcRecord(
- databaseName,
- tableId.getObjectName(),
- new CdcRecord(RowKind.INSERT, fields));
-
- testHarness.processElement(record, 0);
- operator.writes().get(tableId).compact(row(0), 0, true);
- operator.writes().get(tableId).prepareCommit(true, cpId++);
- assertThat(
- MetricUtils.getGauge(compactionMetricGroup,
"lastTableFilesCompactedBefore")
- .getValue())
- .isEqualTo(1L);
- assertThat(
- MetricUtils.getGauge(compactionMetricGroup,
"lastTableFilesCompactedAfter")
- .getValue())
- .isEqualTo(1L);
- assertThat(
- MetricUtils.getGauge(compactionMetricGroup,
"lastChangelogFilesCompacted")
- .getValue())
- .isEqualTo(0L);
-
- fields.put("pt", "0");
- fields.put("k", "2");
- fields.put("v", "12");
-
- CdcMultiplexRecord record1 =
- CdcMultiplexRecord.fromCdcRecord(
- databaseName,
- tableId.getObjectName(),
- new CdcRecord(RowKind.INSERT, fields));
-
- testHarness.processElement(record1, 1);
- operator.writes().get(tableId).compact(row(0), 0, true);
- operator.writes().get(tableId).prepareCommit(true, cpId);
- assertThat(
- MetricUtils.getGauge(compactionMetricGroup,
"lastTableFilesCompactedBefore")
- .getValue())
- .isEqualTo(2L);
- assertThat(
- MetricUtils.getGauge(compactionMetricGroup,
"lastTableFilesCompactedAfter")
- .getValue())
- .isEqualTo(1L);
- assertThat(
- MetricUtils.getGauge(compactionMetricGroup,
"lastChangelogFilesCompacted")
- .getValue())
- .isEqualTo(0L);
-
- // operator closed, metric groups should be unregistered
- testHarness.close();
- assertThat(MetricUtils.getGauge(compactionMetricGroup,
"lastTableFilesCompactedBefore"))
- .isNull();
- assertThat(MetricUtils.getGauge(compactionMetricGroup,
"lastTableFilesCompactedAfter"))
- .isNull();
- assertThat(MetricUtils.getGauge(compactionMetricGroup,
"lastChangelogFilesCompacted"))
- .isNull();
- }
-
private OneInputStreamOperatorTestHarness<CdcMultiplexRecord,
MultiTableCommittable>
createTestHarness(Catalog.Loader catalogLoader) throws Exception {
CdcRecordStoreMultiWriteOperator operator =
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java
index 8d03466f6..9af7eabda 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java
@@ -19,11 +19,9 @@
package org.apache.paimon.flink.sink.cdc;
import org.apache.paimon.CoreOptions;
-import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.flink.sink.CommittableTypeInfo;
import org.apache.paimon.flink.sink.StoreSinkWriteImpl;
-import org.apache.paimon.flink.utils.MetricUtils;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.options.Options;
@@ -42,7 +40,6 @@ import org.apache.paimon.utils.TraceableFileIO;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.JavaSerializer;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.junit.jupiter.api.AfterEach;
@@ -254,78 +251,6 @@ public class CdcRecordStoreWriteOperatorTest {
harness.close();
}
- @Test
- public void testCompactionMetrics() throws Exception {
- RowType rowType =
- RowType.of(
- new DataType[] {DataTypes.INT(), DataTypes.INT()},
- new String[] {"pk", "col1"});
- FileStoreTable table =
- createFileStoreTable(
- rowType, Collections.emptyList(),
Collections.singletonList("pk"));
- OneInputStreamOperatorTestHarness<CdcRecord, Committable> harness =
- createTestHarness(table);
- CdcRecordStoreWriteOperator operator =
- (CdcRecordStoreWriteOperator) harness.getOneInputOperator();
- harness.open();
-
- MetricGroup compactionMetricGroup =
- operator.getMetricGroup()
- .addGroup("paimon")
- .addGroup("table", table.name())
- .addGroup("partition", "_")
- .addGroup("bucket", "0")
- .addGroup("compaction");
-
- long timestamp = 0;
- long cpId = 1L;
- Map<String, String> fields = new HashMap<>();
- fields.put("pk", "1");
- fields.put("col1", "2");
- harness.processElement(new CdcRecord(RowKind.INSERT, fields),
timestamp++);
- operator.getWrite().compact(BinaryRow.EMPTY_ROW, 0, true);
- operator.getWrite().prepareCommit(true, cpId++);
- assertThat(
- MetricUtils.getGauge(compactionMetricGroup,
"lastTableFilesCompactedBefore")
- .getValue())
- .isEqualTo(1L);
- assertThat(
- MetricUtils.getGauge(compactionMetricGroup,
"lastTableFilesCompactedAfter")
- .getValue())
- .isEqualTo(1L);
- assertThat(
- MetricUtils.getGauge(compactionMetricGroup,
"lastChangelogFilesCompacted")
- .getValue())
- .isEqualTo(0L);
-
- fields.put("pk", "1");
- fields.put("col1", "3");
- harness.processElement(new CdcRecord(RowKind.INSERT, fields),
timestamp);
- operator.getWrite().compact(BinaryRow.EMPTY_ROW, 0, true);
- operator.getWrite().prepareCommit(true, cpId);
- assertThat(
- MetricUtils.getGauge(compactionMetricGroup,
"lastTableFilesCompactedBefore")
- .getValue())
- .isEqualTo(2L);
- assertThat(
- MetricUtils.getGauge(compactionMetricGroup,
"lastTableFilesCompactedAfter")
- .getValue())
- .isEqualTo(1L);
- assertThat(
- MetricUtils.getGauge(compactionMetricGroup,
"lastChangelogFilesCompacted")
- .getValue())
- .isEqualTo(0L);
-
- // operator closed, metric groups should be unregistered
- harness.close();
- assertThat(MetricUtils.getGauge(compactionMetricGroup,
"lastTableFilesCompactedBefore"))
- .isNull();
- assertThat(MetricUtils.getGauge(compactionMetricGroup,
"lastTableFilesCompactedAfter"))
- .isNull();
- assertThat(MetricUtils.getGauge(compactionMetricGroup,
"lastChangelogFilesCompacted"))
- .isNull();
- }
-
private OneInputStreamOperatorTestHarness<CdcRecord, Committable>
createTestHarness(
FileStoreTable table) throws Exception {
CdcRecordStoreWriteOperator operator =
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
index 6c4b46ce3..44ceab84b 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
@@ -23,7 +23,7 @@ import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.flink.VersionedSerializerWrapper;
-import org.apache.paimon.flink.utils.MetricUtils;
+import org.apache.paimon.flink.utils.TestingMetricUtils;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.NewFilesIncrement;
@@ -577,14 +577,19 @@ public class CommitterOperatorTest extends
CommitterOperatorTestBase {
.addGroup("paimon")
.addGroup("table", table.name())
.addGroup("commit");
- assertThat(MetricUtils.getGauge(commitMetricGroup,
"lastTableFilesAdded").getValue())
+ assertThat(TestingMetricUtils.getGauge(commitMetricGroup,
"lastTableFilesAdded").getValue())
.isEqualTo(1L);
- assertThat(MetricUtils.getGauge(commitMetricGroup,
"lastTableFilesDeleted").getValue())
+ assertThat(
+ TestingMetricUtils.getGauge(commitMetricGroup,
"lastTableFilesDeleted")
+ .getValue())
.isEqualTo(0L);
- assertThat(MetricUtils.getGauge(commitMetricGroup,
"lastTableFilesAppended").getValue())
+ assertThat(
+ TestingMetricUtils.getGauge(commitMetricGroup,
"lastTableFilesAppended")
+ .getValue())
.isEqualTo(1L);
assertThat(
- MetricUtils.getGauge(commitMetricGroup,
"lastTableFilesCommitCompacted")
+ TestingMetricUtils.getGauge(
+ commitMetricGroup,
"lastTableFilesCommitCompacted")
.getValue())
.isEqualTo(0L);
@@ -602,14 +607,19 @@ public class CommitterOperatorTest extends
CommitterOperatorTestBase {
testHarness.snapshot(cpId, timestamp++);
testHarness.notifyOfCompletedCheckpoint(cpId);
- assertThat(MetricUtils.getGauge(commitMetricGroup,
"lastTableFilesAdded").getValue())
+ assertThat(TestingMetricUtils.getGauge(commitMetricGroup,
"lastTableFilesAdded").getValue())
.isEqualTo(3L);
- assertThat(MetricUtils.getGauge(commitMetricGroup,
"lastTableFilesDeleted").getValue())
+ assertThat(
+ TestingMetricUtils.getGauge(commitMetricGroup,
"lastTableFilesDeleted")
+ .getValue())
.isEqualTo(3L);
- assertThat(MetricUtils.getGauge(commitMetricGroup,
"lastTableFilesAppended").getValue())
+ assertThat(
+ TestingMetricUtils.getGauge(commitMetricGroup,
"lastTableFilesAppended")
+ .getValue())
.isEqualTo(2L);
assertThat(
- MetricUtils.getGauge(commitMetricGroup,
"lastTableFilesCommitCompacted")
+ TestingMetricUtils.getGauge(
+ commitMetricGroup,
"lastTableFilesCommitCompacted")
.getValue())
.isEqualTo(4L);
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
index 9a71cc2ea..68c3c42b0 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
@@ -20,22 +20,16 @@ package org.apache.paimon.flink.sink;
import org.apache.paimon.Snapshot;
import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.catalog.CatalogContext;
-import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.BinaryRowWriter;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.flink.FlinkConnectorOptions;
-import org.apache.paimon.flink.FlinkRowData;
import org.apache.paimon.flink.source.CompactorSourceBuilder;
import org.apache.paimon.flink.util.AbstractTestBase;
-import org.apache.paimon.flink.utils.MetricUtils;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
-import org.apache.paimon.io.DataFileMetaSerializer;
-import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
@@ -52,12 +46,10 @@ import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.SnapshotManager;
-import org.apache.paimon.utils.TraceableFileIO;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -67,7 +59,6 @@ import org.apache.flink.table.data.RowData;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.io.TempDir;
import java.io.IOException;
import java.util.Arrays;
@@ -187,188 +178,6 @@ public class CompactorSinkITCase extends AbstractTestBase
{
.isEqualTo(sinkParalellism);
}
- @Test
- public void testCompactionMetrics() throws Exception {
- FileStoreTable table = createFileStoreTable();
- prepareDataFile(table);
-
- StoreCompactOperator operator = createCompactOperator(table);
- OneInputStreamOperatorTestHarness<RowData, Committable> testHarness =
- createTestHarness(operator);
- testHarness.open();
-
- MetricGroup compactionMetricGroup =
- operator.getMetricGroup()
- .addGroup("paimon")
- .addGroup("table", table.name())
- .addGroup("partition", "dt=20221208_hh=15")
- .addGroup("bucket", "0")
- .addGroup("compaction");
- DataFileMetaSerializer fileMetaSerializer = new
DataFileMetaSerializer();
- RowData record =
- new FlinkRowData(
- GenericRow.of(
- 1L,
- partition("20221208", 15),
- 0,
-
fileMetaSerializer.serializeList(Collections.emptyList())));
-
- long timestamp = 0;
- testHarness.processElement(record, timestamp++);
- testHarness.endInput();
- assertThat(
- MetricUtils.getGauge(compactionMetricGroup,
"lastTableFilesCompactedBefore")
- .getValue())
- .isEqualTo(2L);
- assertThat(
- MetricUtils.getGauge(compactionMetricGroup,
"lastTableFilesCompactedAfter")
- .getValue())
- .isEqualTo(1L);
- assertThat(
- MetricUtils.getGauge(compactionMetricGroup,
"lastChangelogFilesCompacted")
- .getValue())
- .isEqualTo(0L);
-
- compactionMetricGroup =
- operator.getMetricGroup()
- .addGroup("paimon")
- .addGroup("table", table.name())
- .addGroup("partition", "dt=20221208_hh=16")
- .addGroup("bucket", "0")
- .addGroup("compaction");
- record =
- new FlinkRowData(
- GenericRow.of(
- 2L,
- partition("20221208", 16),
- 0,
-
fileMetaSerializer.serializeList(Collections.emptyList())));
-
- testHarness.processElement(record, timestamp);
- testHarness.endInput();
- assertThat(
- MetricUtils.getGauge(compactionMetricGroup,
"lastTableFilesCompactedBefore")
- .getValue())
- .isEqualTo(2L);
- assertThat(
- MetricUtils.getGauge(compactionMetricGroup,
"lastTableFilesCompactedAfter")
- .getValue())
- .isEqualTo(1L);
- assertThat(
- MetricUtils.getGauge(compactionMetricGroup,
"lastChangelogFilesCompacted")
- .getValue())
- .isEqualTo(0L);
-
- // operator closed, metric groups should be unregistered
- testHarness.close();
- assertThat(MetricUtils.getGauge(compactionMetricGroup,
"lastTableFilesCompactedBefore"))
- .isNull();
- assertThat(MetricUtils.getGauge(compactionMetricGroup,
"lastTableFilesCompactedAfter"))
- .isNull();
- assertThat(MetricUtils.getGauge(compactionMetricGroup,
"lastChangelogFilesCompacted"))
- .isNull();
- }
-
- @Test
- public void testMultiTablesCompactionMetrics(@TempDir java.nio.file.Path
tempDir)
- throws Exception {
- Path warehouse = new Path(TraceableFileIO.SCHEME + "://" + tempDir);
- Options catalogOptions = new Options();
- catalogOptions.set(CatalogOptions.WAREHOUSE, warehouse.toString());
- catalogOptions.set(CatalogOptions.URI, "");
- Catalog.Loader catalogLoader =
- () ->
CatalogFactory.createCatalog(CatalogContext.create(catalogOptions));
- Catalog catalog = catalogLoader.load();
- String databaseName = "test_db";
- catalog.createDatabase(databaseName, true);
- Identifier firstTableId = Identifier.create(databaseName,
"test_table1");
- Identifier secondTableId = Identifier.create(databaseName,
"test_table2");
- FileStoreTable firstTable = createCatalogTable(catalog, firstTableId);
- FileStoreTable secondTable = createCatalogTable(catalog,
secondTableId);
- prepareDataFile(firstTable);
- prepareDataFile(secondTable);
-
- MultiTablesStoreCompactOperator operator =
createMultiTablesCompactOperator(catalogLoader);
- OneInputStreamOperatorTestHarness<RowData, MultiTableCommittable>
testHarness =
- createMultiTablesTestHarness(operator);
- testHarness.open();
-
- MetricGroup compactionMetricGroup =
- operator.getMetricGroup()
- .addGroup("paimon")
- .addGroup("table", firstTable.name())
- .addGroup("partition", "_")
- .addGroup("bucket", "0")
- .addGroup("compaction");
- DataFileMetaSerializer fileMetaSerializer = new
DataFileMetaSerializer();
- RowData record =
- new FlinkRowData(
- GenericRow.of(
- 1L,
- serializeBinaryRow(BinaryRow.EMPTY_ROW),
- 0,
-
fileMetaSerializer.serializeList(Collections.emptyList()),
- BinaryString.fromString(databaseName),
-
BinaryString.fromString(firstTableId.getObjectName())));
-
- long timestamp = 0;
- testHarness.processElement(record, timestamp++);
- testHarness.endInput();
- assertThat(
- MetricUtils.getGauge(compactionMetricGroup,
"lastTableFilesCompactedBefore")
- .getValue())
- .isEqualTo(2L);
- assertThat(
- MetricUtils.getGauge(compactionMetricGroup,
"lastTableFilesCompactedAfter")
- .getValue())
- .isEqualTo(1L);
- assertThat(
- MetricUtils.getGauge(compactionMetricGroup,
"lastChangelogFilesCompacted")
- .getValue())
- .isEqualTo(0L);
-
- compactionMetricGroup =
- operator.getMetricGroup()
- .addGroup("paimon")
- .addGroup("table", secondTable.name())
- .addGroup("partition", "_")
- .addGroup("bucket", "0")
- .addGroup("compaction");
- record =
- new FlinkRowData(
- GenericRow.of(
- 2L,
- serializeBinaryRow(BinaryRow.EMPTY_ROW),
- 0,
-
fileMetaSerializer.serializeList(Collections.emptyList()),
- BinaryString.fromString(databaseName),
-
BinaryString.fromString(secondTableId.getObjectName())));
-
- testHarness.processElement(record, timestamp);
- testHarness.endInput();
- assertThat(
- MetricUtils.getGauge(compactionMetricGroup,
"lastTableFilesCompactedBefore")
- .getValue())
- .isEqualTo(2L);
- assertThat(
- MetricUtils.getGauge(compactionMetricGroup,
"lastTableFilesCompactedAfter")
- .getValue())
- .isEqualTo(1L);
- assertThat(
- MetricUtils.getGauge(compactionMetricGroup,
"lastChangelogFilesCompacted")
- .getValue())
- .isEqualTo(0L);
-
- // operator closed, metric groups should be unregistered
- testHarness.close();
- assertThat(MetricUtils.getGauge(compactionMetricGroup,
"lastTableFilesCompactedBefore"))
- .isNull();
- assertThat(MetricUtils.getGauge(compactionMetricGroup,
"lastTableFilesCompactedAfter"))
- .isNull();
- assertThat(MetricUtils.getGauge(compactionMetricGroup,
"lastChangelogFilesCompacted"))
- .isNull();
- }
-
private List<Map<String, String>> getSpecifiedPartitions() {
Map<String, String> partition1 = new HashMap<>();
partition1.put("dt", "20221208");
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FlinkSinkTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FlinkSinkTest.java
index 608b659eb..23b983b06 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FlinkSinkTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FlinkSinkTest.java
@@ -19,10 +19,8 @@
package org.apache.paimon.flink.sink;
import org.apache.paimon.CoreOptions;
-import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.flink.utils.MetricUtils;
import org.apache.paimon.fs.FileIOFinder;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.operation.KeyValueFileStoreWrite;
@@ -45,7 +43,6 @@ import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.state.StateInitializationContextImpl;
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -55,15 +52,12 @@ import
org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import
org.apache.flink.streaming.api.operators.collect.utils.MockOperatorStateStore;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import java.nio.file.Path;
-import java.util.ArrayList;
import java.util.Collections;
-import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
@@ -88,142 +82,6 @@ public class FlinkSinkTest {
assertThat(testSpillable(streamExecutionEnvironment,
fileStoreTable)).isFalse();
}
- @Test
- public void testCompactionMetrics() throws Exception {
- FileStoreTable table = createFileStoreTable();
- RowDataStoreWriteOperator operator = createWriteOperator(table);
- OneInputStreamOperatorTestHarness<InternalRow, Committable>
testHarness =
- createTestHarness(operator);
- MetricGroup compactionMetricGroup =
- operator.getMetricGroup()
- .addGroup("paimon")
- .addGroup("table", table.name())
- .addGroup("partition", "_")
- .addGroup("bucket", "0")
- .addGroup("compaction");
- testHarness.open();
-
- final GenericRow row1 = GenericRow.of(1, 2);
- final GenericRow row2 = GenericRow.of(2, 3);
- final GenericRow row3 = GenericRow.of(3, 4);
- final GenericRow row4 = GenericRow.of(4, 5);
-
- List<StreamRecord<InternalRow>> streamRecords = new ArrayList<>();
- streamRecords.add(new StreamRecord<>(row1));
- streamRecords.add(new StreamRecord<>(row2));
- streamRecords.add(new StreamRecord<>(row3));
-
- long cpId = 1L;
- testHarness.processElements(streamRecords);
- operator.write.compact(BinaryRow.EMPTY_ROW, 0, true);
- operator.write.prepareCommit(true, cpId++);
- assertThat(
- MetricUtils.getGauge(compactionMetricGroup,
"lastTableFilesCompactedBefore")
- .getValue())
- .isEqualTo(1L);
- assertThat(
- MetricUtils.getGauge(compactionMetricGroup,
"lastTableFilesCompactedAfter")
- .getValue())
- .isEqualTo(1L);
- assertThat(
- MetricUtils.getGauge(compactionMetricGroup,
"lastChangelogFilesCompacted")
- .getValue())
- .isEqualTo(0L);
-
- testHarness.processElement(row4, 0);
- operator.write.compact(BinaryRow.EMPTY_ROW, 0, true);
- operator.write.prepareCommit(true, cpId);
- assertThat(
- MetricUtils.getGauge(compactionMetricGroup,
"lastTableFilesCompactedBefore")
- .getValue())
- .isEqualTo(2L);
- assertThat(
- MetricUtils.getGauge(compactionMetricGroup,
"lastTableFilesCompactedAfter")
- .getValue())
- .isEqualTo(1L);
- assertThat(
- MetricUtils.getGauge(compactionMetricGroup,
"lastChangelogFilesCompacted")
- .getValue())
- .isEqualTo(0L);
-
- // operator closed, metric groups should be unregistered
- testHarness.close();
- assertThat(MetricUtils.getGauge(compactionMetricGroup,
"lastTableFilesCompactedBefore"))
- .isNull();
- assertThat(MetricUtils.getGauge(compactionMetricGroup,
"lastTableFilesCompactedAfter"))
- .isNull();
- assertThat(MetricUtils.getGauge(compactionMetricGroup,
"lastChangelogFilesCompacted"))
- .isNull();
- }
-
- @Test
- public void testDynamicBucketCompactionMetrics() throws Exception {
- FileStoreTable table = createFileStoreTable();
- DynamicBucketRowWriteOperator operator =
createDynamicBucketWriteOperator(table);
- OneInputStreamOperatorTestHarness<Tuple2<InternalRow, Integer>,
Committable> testHarness =
- createDynamicBucketTestHarness(operator);
- MetricGroup compactionMetricGroup =
- operator.getMetricGroup()
- .addGroup("paimon")
- .addGroup("table", table.name())
- .addGroup("partition", "_")
- .addGroup("bucket", "0")
- .addGroup("compaction");
- testHarness.open();
-
- final GenericRow row1 = GenericRow.of(1, 2);
- final GenericRow row2 = GenericRow.of(2, 3);
- final GenericRow row3 = GenericRow.of(3, 4);
- final GenericRow row4 = GenericRow.of(4, 5);
-
- List<StreamRecord<Tuple2<InternalRow, Integer>>> streamRecords = new
ArrayList<>();
- streamRecords.add(new StreamRecord<>(Tuple2.of(row1, 0)));
- streamRecords.add(new StreamRecord<>(Tuple2.of(row2, 1)));
- streamRecords.add(new StreamRecord<>(Tuple2.of(row3, 2)));
-
- long cpId = 1L;
- testHarness.processElements(streamRecords);
- operator.write.compact(BinaryRow.EMPTY_ROW, 0, true);
- operator.write.prepareCommit(true, cpId++);
- assertThat(
- MetricUtils.getGauge(compactionMetricGroup,
"lastTableFilesCompactedBefore")
- .getValue())
- .isEqualTo(1L);
- assertThat(
- MetricUtils.getGauge(compactionMetricGroup,
"lastTableFilesCompactedAfter")
- .getValue())
- .isEqualTo(1L);
- assertThat(
- MetricUtils.getGauge(compactionMetricGroup,
"lastChangelogFilesCompacted")
- .getValue())
- .isEqualTo(0L);
-
- testHarness.processElement(Tuple2.of(row4, 0), 0);
- operator.write.compact(BinaryRow.EMPTY_ROW, 0, true);
- operator.write.prepareCommit(true, cpId);
- assertThat(
- MetricUtils.getGauge(compactionMetricGroup,
"lastTableFilesCompactedBefore")
- .getValue())
- .isEqualTo(2L);
- assertThat(
- MetricUtils.getGauge(compactionMetricGroup,
"lastTableFilesCompactedAfter")
- .getValue())
- .isEqualTo(1L);
- assertThat(
- MetricUtils.getGauge(compactionMetricGroup,
"lastChangelogFilesCompacted")
- .getValue())
- .isEqualTo(0L);
-
- // operator closed, metric groups should be unregistered
- testHarness.close();
- assertThat(MetricUtils.getGauge(compactionMetricGroup,
"lastTableFilesCompactedBefore"))
- .isNull();
- assertThat(MetricUtils.getGauge(compactionMetricGroup,
"lastTableFilesCompactedAfter"))
- .isNull();
- assertThat(MetricUtils.getGauge(compactionMetricGroup,
"lastChangelogFilesCompacted"))
- .isNull();
- }
-
private boolean testSpillable(
StreamExecutionEnvironment streamExecutionEnvironment,
FileStoreTable fileStoreTable)
throws Exception {
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java
index 3730a1930..cc7bb27a9 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java
@@ -28,7 +28,7 @@ import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.VersionedSerializerWrapper;
-import org.apache.paimon.flink.utils.MetricUtils;
+import org.apache.paimon.flink.utils.TestingMetricUtils;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.manifest.WrappedManifestCommittable;
@@ -586,25 +586,39 @@ class StoreMultiCommitterTest {
.addGroup("table", table2.name())
.addGroup("commit");
- assertThat(MetricUtils.getGauge(commitMetricGroup1,
"lastTableFilesAdded").getValue())
+ assertThat(
+ TestingMetricUtils.getGauge(commitMetricGroup1,
"lastTableFilesAdded")
+ .getValue())
.isEqualTo(1L);
- assertThat(MetricUtils.getGauge(commitMetricGroup1,
"lastTableFilesDeleted").getValue())
+ assertThat(
+ TestingMetricUtils.getGauge(commitMetricGroup1,
"lastTableFilesDeleted")
+ .getValue())
.isEqualTo(0L);
- assertThat(MetricUtils.getGauge(commitMetricGroup1,
"lastTableFilesAppended").getValue())
+ assertThat(
+ TestingMetricUtils.getGauge(commitMetricGroup1,
"lastTableFilesAppended")
+ .getValue())
.isEqualTo(1L);
assertThat(
- MetricUtils.getGauge(commitMetricGroup1,
"lastTableFilesCommitCompacted")
+ TestingMetricUtils.getGauge(
+ commitMetricGroup1,
"lastTableFilesCommitCompacted")
.getValue())
.isEqualTo(0L);
- assertThat(MetricUtils.getGauge(commitMetricGroup2,
"lastTableFilesAdded").getValue())
+ assertThat(
+ TestingMetricUtils.getGauge(commitMetricGroup2,
"lastTableFilesAdded")
+ .getValue())
.isEqualTo(4L);
- assertThat(MetricUtils.getGauge(commitMetricGroup2,
"lastTableFilesDeleted").getValue())
+ assertThat(
+ TestingMetricUtils.getGauge(commitMetricGroup2,
"lastTableFilesDeleted")
+ .getValue())
.isEqualTo(3L);
- assertThat(MetricUtils.getGauge(commitMetricGroup2,
"lastTableFilesAppended").getValue())
+ assertThat(
+ TestingMetricUtils.getGauge(commitMetricGroup2,
"lastTableFilesAppended")
+ .getValue())
.isEqualTo(3L);
assertThat(
- MetricUtils.getGauge(commitMetricGroup2,
"lastTableFilesCommitCompacted")
+ TestingMetricUtils.getGauge(
+ commitMetricGroup2,
"lastTableFilesCommitCompacted")
.getValue())
.isEqualTo(4L);
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTestBase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTestBase.java
index 82d7b0e3e..1fc6e8e61 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTestBase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTestBase.java
@@ -23,7 +23,7 @@ import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.utils.InternalRowTypeSerializer;
import org.apache.paimon.flink.utils.InternalTypeInfo;
-import org.apache.paimon.flink.utils.MetricUtils;
+import org.apache.paimon.flink.utils.TestingMetricUtils;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.options.ConfigOption;
@@ -38,9 +38,7 @@ import org.apache.paimon.types.RowType;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
-import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
@@ -92,25 +90,6 @@ public abstract class WriterOperatorTestBase {
harness.notifyOfCompletedCheckpoint(1);
OperatorMetricGroup metricGroup =
rowDataStoreWriteOperator.getMetricGroup();
- MetricGroup writerMetricGroup =
- metricGroup
- .addGroup("paimon")
- .addGroup("table", tableName)
- .addGroup("partition", "_")
- .addGroup("bucket", "0")
- .addGroup("writer");
-
- Counter writeRecordCount = MetricUtils.getCounter(writerMetricGroup,
"writeRecordCount");
- Assertions.assertThat(writeRecordCount.getCount()).isEqualTo(size);
-
- // test histogram has sample
- Histogram flushCostMS = MetricUtils.getHistogram(writerMetricGroup,
"flushCostMillis");
- Assertions.assertThat(flushCostMS.getCount()).isGreaterThan(0);
-
- Histogram prepareCommitCostMS =
- MetricUtils.getHistogram(writerMetricGroup,
"prepareCommitCostMillis");
- Assertions.assertThat(prepareCommitCostMS.getCount()).isGreaterThan(0);
-
MetricGroup writerBufferMetricGroup =
metricGroup
.addGroup("paimon")
@@ -118,17 +97,17 @@ public abstract class WriterOperatorTestBase {
.addGroup("writerBuffer");
Gauge<Long> bufferPreemptCount =
- MetricUtils.getGauge(writerBufferMetricGroup,
"bufferPreemptCount");
+ TestingMetricUtils.getGauge(writerBufferMetricGroup,
"bufferPreemptCount");
Assertions.assertThat(bufferPreemptCount.getValue()).isEqualTo(0);
Gauge<Long> totalWriteBufferSizeByte =
- MetricUtils.getGauge(writerBufferMetricGroup,
"totalWriteBufferSizeByte");
+ TestingMetricUtils.getGauge(writerBufferMetricGroup,
"totalWriteBufferSizeByte");
Assertions.assertThat(totalWriteBufferSizeByte.getValue()).isEqualTo(256);
GenericRow row = GenericRow.of(1, 1);
harness.processElement(row, 1);
Gauge<Long> usedWriteBufferSizeByte =
- MetricUtils.getGauge(writerBufferMetricGroup,
"usedWriteBufferSizeByte");
+ TestingMetricUtils.getGauge(writerBufferMetricGroup,
"usedWriteBufferSizeByte");
Assertions.assertThat(usedWriteBufferSizeByte.getValue()).isGreaterThan(0);
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceMetricsTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceMetricsTest.java
index 27039a6ee..13613dab0 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceMetricsTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceMetricsTest.java
@@ -20,7 +20,7 @@ package org.apache.paimon.flink.source;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.flink.FlinkConnectorOptions;
-import org.apache.paimon.flink.utils.MetricUtils;
+import org.apache.paimon.flink.utils.TestingMetricUtils;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
@@ -87,9 +87,11 @@ public class FileStoreSourceMetricsTest {
1,
FlinkConnectorOptions.SplitAssignMode.FAIR);
staticFileStoreSource.restoreEnumerator(context, null);
- assertThat(MetricUtils.getGauge(scanMetricGroup,
"lastScannedManifests").getValue())
+ assertThat(TestingMetricUtils.getGauge(scanMetricGroup,
"lastScannedManifests").getValue())
.isEqualTo(1L);
- assertThat(MetricUtils.getGauge(scanMetricGroup,
"lastScanResultedTableFiles").getValue())
+ assertThat(
+ TestingMetricUtils.getGauge(scanMetricGroup,
"lastScanResultedTableFiles")
+ .getValue())
.isEqualTo(1L);
}
@@ -102,20 +104,24 @@ public class FileStoreSourceMetricsTest {
(ContinuousFileSplitEnumerator)
continuousFileStoreSource.restoreEnumerator(context,
null);
enumerator.scanNextSnapshot();
- assertThat(MetricUtils.getHistogram(scanMetricGroup,
"scanDuration").getCount())
+ assertThat(TestingMetricUtils.getHistogram(scanMetricGroup,
"scanDuration").getCount())
.isEqualTo(1);
- assertThat(MetricUtils.getGauge(scanMetricGroup,
"lastScannedManifests").getValue())
+ assertThat(TestingMetricUtils.getGauge(scanMetricGroup,
"lastScannedManifests").getValue())
.isEqualTo(1L);
- assertThat(MetricUtils.getGauge(scanMetricGroup,
"lastScanResultedTableFiles").getValue())
+ assertThat(
+ TestingMetricUtils.getGauge(scanMetricGroup,
"lastScanResultedTableFiles")
+ .getValue())
.isEqualTo(1L);
writeAgain();
enumerator.scanNextSnapshot();
- assertThat(MetricUtils.getHistogram(scanMetricGroup,
"scanDuration").getCount())
+ assertThat(TestingMetricUtils.getHistogram(scanMetricGroup,
"scanDuration").getCount())
.isEqualTo(2);
- assertThat(MetricUtils.getGauge(scanMetricGroup,
"lastScannedManifests").getValue())
+ assertThat(TestingMetricUtils.getGauge(scanMetricGroup,
"lastScannedManifests").getValue())
.isEqualTo(1L);
- assertThat(MetricUtils.getGauge(scanMetricGroup,
"lastScanResultedTableFiles").getValue())
+ assertThat(
+ TestingMetricUtils.getGauge(scanMetricGroup,
"lastScanResultedTableFiles")
+ .getValue())
.isEqualTo(1L);
}
@@ -125,9 +131,11 @@ public class FileStoreSourceMetricsTest {
FlinkSource logHybridFileStoreSource =
LogHybridSourceFactory.buildHybridFirstSource(table, null,
null);
logHybridFileStoreSource.restoreEnumerator(context, null);
- assertThat(MetricUtils.getGauge(scanMetricGroup,
"lastScannedManifests").getValue())
+ assertThat(TestingMetricUtils.getGauge(scanMetricGroup,
"lastScannedManifests").getValue())
.isEqualTo(1L);
- assertThat(MetricUtils.getGauge(scanMetricGroup,
"lastScanResultedTableFiles").getValue())
+ assertThat(
+ TestingMetricUtils.getGauge(scanMetricGroup,
"lastScanResultedTableFiles")
+ .getValue())
.isEqualTo(1L);
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java
index 3c046457e..0f6d66815 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java
@@ -23,7 +23,7 @@ import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.GenericRow;
-import org.apache.paimon.flink.utils.MetricUtils;
+import org.apache.paimon.flink.utils.TestingMetricUtils;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.BatchTableCommit;
@@ -194,24 +194,26 @@ public class OperatorSourceTest {
MetricGroup readerOperatorMetricGroup = readOperator.getMetricGroup();
harness.open();
assertThat(
- MetricUtils.getGauge(readerOperatorMetricGroup,
"currentFetchEventTimeLag")
+ TestingMetricUtils.getGauge(
+ readerOperatorMetricGroup,
"currentFetchEventTimeLag")
.getValue())
.isEqualTo(-1L);
assertThat(
- MetricUtils.getGauge(readerOperatorMetricGroup,
"currentEmitEventTimeLag")
+ TestingMetricUtils.getGauge(
+ readerOperatorMetricGroup,
"currentEmitEventTimeLag")
.getValue())
.isEqualTo(-1L);
harness.processElement(new StreamRecord<>(splits.get(0)));
assertThat(
(Long)
- MetricUtils.getGauge(
+ TestingMetricUtils.getGauge(
readerOperatorMetricGroup,
"currentFetchEventTimeLag")
.getValue())
.isGreaterThan(0);
assertThat(
(Long)
- MetricUtils.getGauge(
+ TestingMetricUtils.getGauge(
readerOperatorMetricGroup,
"currentEmitEventTimeLag")
.getValue())
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/utils/MetricUtils.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/utils/TestingMetricUtils.java
similarity index 98%
rename from
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/utils/MetricUtils.java
rename to
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/utils/TestingMetricUtils.java
index b37e4db80..39ab5ae30 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/utils/MetricUtils.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/utils/TestingMetricUtils.java
@@ -29,7 +29,7 @@ import java.lang.reflect.Field;
import java.util.Map;
/** Test utils for Flink's {@link Metric}s. */
-public class MetricUtils {
+public class TestingMetricUtils {
public static <T> Gauge<T> getGauge(MetricGroup group, String metricName) {
return (Gauge<T>) getMetric(group, metricName);