This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 30a4bfbd2d [core] Honor the compaction optimization interval across
Merge tree writer re-creation (#7825)
30a4bfbd2d is described below
commit 30a4bfbd2d9934b8f2965fa1573bbcf343ccf75b
Author: Arnav Balyan <[email protected]>
AuthorDate: Mon May 18 13:57:29 2026 +0530
[core] Honor the compaction optimization interval across Merge tree writer
re-creation (#7825)
- Today `compaction.optimization-interval` is not respected. When this
interval set, the full compaction is supposed to run at max once per
time interval. But this actually runs on every writer recreate much more
frequently than the time interval, which can silently inflate i/o and
CPU
- The last compaction ts is an inmemory field that starts as `null`. The
`MergeTreeWriter` is recreated many times, causing the compaction fires
more frequently.
- This can be avoided by maintaining the ts of last compaction. Today
this ts is lost across writer recreations of MergeTreeWriter, but can be
tracked using restoreFiles.
- Fix this by computing the most recent creationTimeEpochMillis of files
at the max LSM level from restoreFiles and passing it to full compaction
create.
---
.../mergetree/compact/EarlyFullCompaction.java | 18 +++++++-
.../compact/MergeTreeCompactManagerFactory.java | 27 ++++++++++--
.../mergetree/compact/EarlyFullCompactionTest.java | 49 ++++++++++++++++++++++
3 files changed, 89 insertions(+), 5 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/EarlyFullCompaction.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/EarlyFullCompaction.java
index 7b7185dd7b..272d14dcea 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/EarlyFullCompaction.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/EarlyFullCompaction.java
@@ -48,13 +48,28 @@ public class EarlyFullCompaction {
@Nullable Long fullCompactionInterval,
@Nullable Long totalSizeThreshold,
@Nullable Long incrementalSizeThreshold) {
+ this(fullCompactionInterval, totalSizeThreshold,
incrementalSizeThreshold, null);
+ }
+
+ public EarlyFullCompaction(
+ @Nullable Long fullCompactionInterval,
+ @Nullable Long totalSizeThreshold,
+ @Nullable Long incrementalSizeThreshold,
+ @Nullable Long initialLastFullCompaction) {
this.fullCompactionInterval = fullCompactionInterval;
this.totalSizeThreshold = totalSizeThreshold;
this.incrementalSizeThreshold = incrementalSizeThreshold;
+ this.lastFullCompaction = initialLastFullCompaction;
}
@Nullable
public static EarlyFullCompaction create(CoreOptions options) {
+ return create(options, null);
+ }
+
+ @Nullable
+ public static EarlyFullCompaction create(
+ CoreOptions options, @Nullable Long initialLastFullCompaction) {
Duration interval = options.optimizedCompactionInterval();
MemorySize totalThreshold = options.compactionTotalSizeThreshold();
MemorySize incrementalThreshold =
options.compactionIncrementalSizeThreshold();
@@ -64,7 +79,8 @@ public class EarlyFullCompaction {
return new EarlyFullCompaction(
interval == null ? null : interval.toMillis(),
totalThreshold == null ? null : totalThreshold.getBytes(),
- incrementalThreshold == null ? null :
incrementalThreshold.getBytes());
+ incrementalThreshold == null ? null :
incrementalThreshold.getBytes(),
+ initialLastFullCompaction);
}
public Optional<CompactUnit> tryFullCompact(int numLevels,
List<LevelSortedRun> runs) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerFactory.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerFactory.java
index 4c393c3815..89d1931384 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerFactory.java
@@ -152,7 +152,7 @@ public class MergeTreeCompactManagerFactory implements
KvCompactionManagerFactor
return new NoopCompactManager();
}
- CompactStrategy compactStrategy = createCompactStrategy(options);
+ CompactStrategy compactStrategy = createCompactStrategy(options,
restoreFiles);
Comparator<InternalRow> keyComparator = keyComparatorSupplier.get();
Levels levels = new Levels(keyComparator, restoreFiles,
options.numLevels());
@Nullable FieldsComparator userDefinedSeqComparator =
udsComparatorSupplier.get();
@@ -188,7 +188,10 @@ public class MergeTreeCompactManagerFactory implements
KvCompactionManagerFactor
options.isChainTable());
}
- private CompactStrategy createCompactStrategy(CoreOptions options) {
+ private CompactStrategy createCompactStrategy(
+ CoreOptions options, List<DataFileMeta> restoreFiles) {
+ Long initialLastFullCompaction =
+ estimateLastFullCompactionTime(restoreFiles,
options.numLevels());
if (options.needLookup()) {
Integer compactMaxInterval = null;
switch (options.lookupCompact()) {
@@ -203,7 +206,7 @@ public class MergeTreeCompactManagerFactory implements
KvCompactionManagerFactor
options.maxSizeAmplificationPercent(),
options.sortedRunSizeRatio(),
options.numSortedRunCompactionTrigger(),
- EarlyFullCompaction.create(options),
+ EarlyFullCompaction.create(options,
initialLastFullCompaction),
OffPeakHours.create(options)),
compactMaxInterval);
}
@@ -213,7 +216,7 @@ public class MergeTreeCompactManagerFactory implements
KvCompactionManagerFactor
options.maxSizeAmplificationPercent(),
options.sortedRunSizeRatio(),
options.numSortedRunCompactionTrigger(),
- EarlyFullCompaction.create(options),
+ EarlyFullCompaction.create(options,
initialLastFullCompaction),
OffPeakHours.create(options));
if (options.compactionForceUpLevel0()) {
return new ForceUpLevel0Compaction(universal, null);
@@ -222,6 +225,22 @@ public class MergeTreeCompactManagerFactory implements
KvCompactionManagerFactor
}
}
+ @Nullable
+ private static Long estimateLastFullCompactionTime(
+ List<DataFileMeta> restoreFiles, int numLevels) {
+ int maxLevel = numLevels - 1;
+ long max = -1;
+ for (DataFileMeta f : restoreFiles) {
+ if (f.level() == maxLevel) {
+ long t = f.creationTimeEpochMillis();
+ if (t > max) {
+ max = t;
+ }
+ }
+ }
+ return max < 0 ? null : max;
+ }
+
private MergeTreeCompactRewriter createRewriter(
BinaryRow partition,
int bucket,
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/EarlyFullCompactionTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/EarlyFullCompactionTest.java
index b4f1850527..f7b95da879 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/EarlyFullCompactionTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/EarlyFullCompactionTest.java
@@ -90,6 +90,41 @@ public class EarlyFullCompactionTest {
assertThat(trigger.tryFullCompact(5, createRuns(100, 200))).isEmpty();
}
+ @Test
+ public void testCreateWithInitialLastFullCompaction() {
+ Options options = new Options();
+ options.set(COMPACTION_OPTIMIZATION_INTERVAL, Duration.ofHours(1));
+ EarlyFullCompaction trigger =
+ EarlyFullCompaction.create(new CoreOptions(options),
1234567890L);
+ assertThat(trigger).isNotNull();
+ }
+
+ @Test
+ public void testIntervalDoesNotRetriggerWhenSeededRecent() {
+ AtomicLong time = new AtomicLong(10_000);
+ TestableEarlyFullCompaction trigger =
+ new TestableEarlyFullCompaction(1000L, null, null, 9_500L,
time);
+
+ Optional<CompactUnit> compactUnit = trigger.tryFullCompact(5,
createRuns(100, 200));
+ assertThat(compactUnit).isEmpty();
+
+ time.addAndGet(501); // now 10_501, diff vs seed (9_500) is 1001 > 1000
+ compactUnit = trigger.tryFullCompact(5, createRuns(100, 200));
+ assertThat(compactUnit).isPresent();
+ assertThat(compactUnit.get().outputLevel()).isEqualTo(4);
+ }
+
+ @Test
+ public void testIntervalTriggersWhenSeededOld() {
+ AtomicLong time = new AtomicLong(10_000);
+ TestableEarlyFullCompaction trigger =
+ new TestableEarlyFullCompaction(1000L, null, null, 5_000L,
time);
+
+ Optional<CompactUnit> compactUnit = trigger.tryFullCompact(5,
createRuns(100, 200));
+ assertThat(compactUnit).isPresent();
+ assertThat(compactUnit.get().outputLevel()).isEqualTo(4);
+ }
+
@Test
public void testInterval() {
AtomicLong time = new AtomicLong(10_000);
@@ -280,6 +315,20 @@ public class EarlyFullCompactionTest {
this.currentTime = currentTime;
}
+ public TestableEarlyFullCompaction(
+ @Nullable Long fullCompactionInterval,
+ @Nullable Long totalSizeThreshold,
+ @Nullable Long incrementalSizeThreshold,
+ @Nullable Long initialLastFullCompaction,
+ AtomicLong currentTime) {
+ super(
+ fullCompactionInterval,
+ totalSizeThreshold,
+ incrementalSizeThreshold,
+ initialLastFullCompaction);
+ this.currentTime = currentTime;
+ }
+
@Override
long currentTimeMillis() {
return currentTime.get();