This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 30a4bfbd2d [core] Honor the compaction optimization interval across 
Merge tree writer re-creation (#7825)
30a4bfbd2d is described below

commit 30a4bfbd2d9934b8f2965fa1573bbcf343ccf75b
Author: Arnav Balyan <[email protected]>
AuthorDate: Mon May 18 13:57:29 2026 +0530

    [core] Honor the compaction optimization interval across Merge tree writer 
re-creation (#7825)
    
    - Today `compaction.optimization-interval` is not respected. When this
    interval set, the full compaction is supposed to run at max once per
    time interval. But this actually runs on every writer recreate much more
    frequently than the time interval, which can silently inflate i/o and
    CPU
    - The last compaction ts is an inmemory field that starts as `null`. The
    `MergeTreeWriter` is recreated many times, causing the compaction fires
    more frequently.
    - This can be avoided by maintaining the ts of last compaction. Today
    this ts is lost across writer recreations of MergeTreeWriter, but can be
    tracked using restoreFiles.
    - Fix this by computing the most recent creationTimeEpochMillis of files
    at the max LSM level from restoreFiles and passing it to full compaction
    create.
---
 .../mergetree/compact/EarlyFullCompaction.java     | 18 +++++++-
 .../compact/MergeTreeCompactManagerFactory.java    | 27 ++++++++++--
 .../mergetree/compact/EarlyFullCompactionTest.java | 49 ++++++++++++++++++++++
 3 files changed, 89 insertions(+), 5 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/EarlyFullCompaction.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/EarlyFullCompaction.java
index 7b7185dd7b..272d14dcea 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/EarlyFullCompaction.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/EarlyFullCompaction.java
@@ -48,13 +48,28 @@ public class EarlyFullCompaction {
             @Nullable Long fullCompactionInterval,
             @Nullable Long totalSizeThreshold,
             @Nullable Long incrementalSizeThreshold) {
+        this(fullCompactionInterval, totalSizeThreshold, 
incrementalSizeThreshold, null);
+    }
+
+    public EarlyFullCompaction(
+            @Nullable Long fullCompactionInterval,
+            @Nullable Long totalSizeThreshold,
+            @Nullable Long incrementalSizeThreshold,
+            @Nullable Long initialLastFullCompaction) {
         this.fullCompactionInterval = fullCompactionInterval;
         this.totalSizeThreshold = totalSizeThreshold;
         this.incrementalSizeThreshold = incrementalSizeThreshold;
+        this.lastFullCompaction = initialLastFullCompaction;
     }
 
     @Nullable
     public static EarlyFullCompaction create(CoreOptions options) {
+        return create(options, null);
+    }
+
+    @Nullable
+    public static EarlyFullCompaction create(
+            CoreOptions options, @Nullable Long initialLastFullCompaction) {
         Duration interval = options.optimizedCompactionInterval();
         MemorySize totalThreshold = options.compactionTotalSizeThreshold();
         MemorySize incrementalThreshold = 
options.compactionIncrementalSizeThreshold();
@@ -64,7 +79,8 @@ public class EarlyFullCompaction {
         return new EarlyFullCompaction(
                 interval == null ? null : interval.toMillis(),
                 totalThreshold == null ? null : totalThreshold.getBytes(),
-                incrementalThreshold == null ? null : 
incrementalThreshold.getBytes());
+                incrementalThreshold == null ? null : 
incrementalThreshold.getBytes(),
+                initialLastFullCompaction);
     }
 
     public Optional<CompactUnit> tryFullCompact(int numLevels, 
List<LevelSortedRun> runs) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerFactory.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerFactory.java
index 4c393c3815..89d1931384 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerFactory.java
@@ -152,7 +152,7 @@ public class MergeTreeCompactManagerFactory implements 
KvCompactionManagerFactor
             return new NoopCompactManager();
         }
 
-        CompactStrategy compactStrategy = createCompactStrategy(options);
+        CompactStrategy compactStrategy = createCompactStrategy(options, 
restoreFiles);
         Comparator<InternalRow> keyComparator = keyComparatorSupplier.get();
         Levels levels = new Levels(keyComparator, restoreFiles, 
options.numLevels());
         @Nullable FieldsComparator userDefinedSeqComparator = 
udsComparatorSupplier.get();
@@ -188,7 +188,10 @@ public class MergeTreeCompactManagerFactory implements 
KvCompactionManagerFactor
                 options.isChainTable());
     }
 
-    private CompactStrategy createCompactStrategy(CoreOptions options) {
+    private CompactStrategy createCompactStrategy(
+            CoreOptions options, List<DataFileMeta> restoreFiles) {
+        Long initialLastFullCompaction =
+                estimateLastFullCompactionTime(restoreFiles, 
options.numLevels());
         if (options.needLookup()) {
             Integer compactMaxInterval = null;
             switch (options.lookupCompact()) {
@@ -203,7 +206,7 @@ public class MergeTreeCompactManagerFactory implements 
KvCompactionManagerFactor
                             options.maxSizeAmplificationPercent(),
                             options.sortedRunSizeRatio(),
                             options.numSortedRunCompactionTrigger(),
-                            EarlyFullCompaction.create(options),
+                            EarlyFullCompaction.create(options, 
initialLastFullCompaction),
                             OffPeakHours.create(options)),
                     compactMaxInterval);
         }
@@ -213,7 +216,7 @@ public class MergeTreeCompactManagerFactory implements 
KvCompactionManagerFactor
                         options.maxSizeAmplificationPercent(),
                         options.sortedRunSizeRatio(),
                         options.numSortedRunCompactionTrigger(),
-                        EarlyFullCompaction.create(options),
+                        EarlyFullCompaction.create(options, 
initialLastFullCompaction),
                         OffPeakHours.create(options));
         if (options.compactionForceUpLevel0()) {
             return new ForceUpLevel0Compaction(universal, null);
@@ -222,6 +225,22 @@ public class MergeTreeCompactManagerFactory implements 
KvCompactionManagerFactor
         }
     }
 
+    @Nullable
+    private static Long estimateLastFullCompactionTime(
+            List<DataFileMeta> restoreFiles, int numLevels) {
+        int maxLevel = numLevels - 1;
+        long max = -1;
+        for (DataFileMeta f : restoreFiles) {
+            if (f.level() == maxLevel) {
+                long t = f.creationTimeEpochMillis();
+                if (t > max) {
+                    max = t;
+                }
+            }
+        }
+        return max < 0 ? null : max;
+    }
+
     private MergeTreeCompactRewriter createRewriter(
             BinaryRow partition,
             int bucket,
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/EarlyFullCompactionTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/EarlyFullCompactionTest.java
index b4f1850527..f7b95da879 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/EarlyFullCompactionTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/EarlyFullCompactionTest.java
@@ -90,6 +90,41 @@ public class EarlyFullCompactionTest {
         assertThat(trigger.tryFullCompact(5, createRuns(100, 200))).isEmpty();
     }
 
+    @Test
+    public void testCreateWithInitialLastFullCompaction() {
+        Options options = new Options();
+        options.set(COMPACTION_OPTIMIZATION_INTERVAL, Duration.ofHours(1));
+        EarlyFullCompaction trigger =
+                EarlyFullCompaction.create(new CoreOptions(options), 
1234567890L);
+        assertThat(trigger).isNotNull();
+    }
+
+    @Test
+    public void testIntervalDoesNotRetriggerWhenSeededRecent() {
+        AtomicLong time = new AtomicLong(10_000);
+        TestableEarlyFullCompaction trigger =
+                new TestableEarlyFullCompaction(1000L, null, null, 9_500L, 
time);
+
+        Optional<CompactUnit> compactUnit = trigger.tryFullCompact(5, 
createRuns(100, 200));
+        assertThat(compactUnit).isEmpty();
+
+        time.addAndGet(501); // now 10_501, diff vs seed (9_500) is 1001 > 1000
+        compactUnit = trigger.tryFullCompact(5, createRuns(100, 200));
+        assertThat(compactUnit).isPresent();
+        assertThat(compactUnit.get().outputLevel()).isEqualTo(4);
+    }
+
+    @Test
+    public void testIntervalTriggersWhenSeededOld() {
+        AtomicLong time = new AtomicLong(10_000);
+        TestableEarlyFullCompaction trigger =
+                new TestableEarlyFullCompaction(1000L, null, null, 5_000L, 
time);
+
+        Optional<CompactUnit> compactUnit = trigger.tryFullCompact(5, 
createRuns(100, 200));
+        assertThat(compactUnit).isPresent();
+        assertThat(compactUnit.get().outputLevel()).isEqualTo(4);
+    }
+
     @Test
     public void testInterval() {
         AtomicLong time = new AtomicLong(10_000);
@@ -280,6 +315,20 @@ public class EarlyFullCompactionTest {
             this.currentTime = currentTime;
         }
 
+        public TestableEarlyFullCompaction(
+                @Nullable Long fullCompactionInterval,
+                @Nullable Long totalSizeThreshold,
+                @Nullable Long incrementalSizeThreshold,
+                @Nullable Long initialLastFullCompaction,
+                AtomicLong currentTime) {
+            super(
+                    fullCompactionInterval,
+                    totalSizeThreshold,
+                    incrementalSizeThreshold,
+                    initialLastFullCompaction);
+            this.currentTime = currentTime;
+        }
+
         @Override
         long currentTimeMillis() {
             return currentTime.get();

Reply via email to