This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new d8661586ea [core] Introduce gentle lookup compaction mode to reduce 
overall compaction frequency (#5178)
d8661586ea is described below

commit d8661586ea9d317aa4da6788072547fb939478f5
Author: xiangyu0xf <[email protected]>
AuthorDate: Mon Mar 3 14:13:24 2025 +0800

    [core] Introduce gentle lookup compaction mode to reduce overall compaction 
frequency (#5178)
---
 .../shortcodes/generated/core_configuration.html   |  12 ++
 .../main/java/org/apache/paimon/CoreOptions.java   |  44 +++++++
 .../java/org/apache/paimon/utils/MathUtils.java    |  11 ++
 .../org/apache/paimon/compact/CompactManager.java  |   5 +-
 .../apache/paimon/mergetree/MergeTreeWriter.java   |   2 +-
 .../mergetree/compact/ForceUpLevel0Compaction.java |  14 +--
 .../mergetree/compact/MergeTreeCompactManager.java |  15 ++-
 .../mergetree/compact/UniversalCompaction.java     |  50 ++++++++
 .../paimon/operation/AbstractFileStoreWrite.java   |   2 +
 .../paimon/operation/KeyValueFileStoreWrite.java   |  40 ++++--
 .../java/org/apache/paimon/utils/RecordWriter.java |   5 +-
 .../apache/paimon/mergetree/MergeTreeTestBase.java |   4 +-
 .../compact/MergeTreeCompactManagerTest.java       |  49 ++++++++
 .../mergetree/compact/UniversalCompactionTest.java |  62 ++++++++++
 .../operation/KeyValueFileStoreWriteTest.java      | 134 +++++++++++++++++++++
 .../org/apache/paimon/flink/sink/FlinkSink.java    |   2 +-
 .../paimon/flink/sink/WriterOperatorTest.java      | 124 +++++++++++++++++++
 17 files changed, 545 insertions(+), 30 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index 6a2510369d..8e845b5a23 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -435,6 +435,18 @@ Mainly to resolve data skew on primary keys. We recommend 
starting with 64 mb wh
             <td>Integer</td>
             <td>The maximal fan-in for external merge sort. It limits the 
number of file handles. If it is too small, may cause intermediate merging. But 
if it is too large, it will cause too many files opened at the same time, 
consume memory and lead to random reading.</td>
         </tr>
+        <tr>
+            <td><h5>lookup-compact</h5></td>
+            <td style="word-wrap: break-word;">RADICAL</td>
+            <td><p>Enum</p></td>
+            <td>Lookup compact mode used for lookup compaction.<br /><br 
/>Possible values:<ul><li>"RADICAL"</li><li>"GENTLE"</li></ul></td>
+        </tr>
+        <tr>
+            <td><h5>lookup-compact.max-interval</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>Integer</td>
+            <td>The max interval for a gentle mode lookup compaction to be 
triggered. For every interval, a forced lookup compaction will be performed to 
flush L0 files to higher level. This option is only valid when lookup-compact 
mode is gentle.</td>
+        </tr>
         <tr>
             <td><h5>lookup-wait</h5></td>
             <td style="word-wrap: break-word;">true</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index dcb5588fa7..0341ab6a49 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1523,6 +1523,21 @@ public class CoreOptions implements Serializable {
                     .withDescription(
                             "When need to lookup, commit will wait for 
compaction by lookup.");
 
+    public static final ConfigOption<LookupCompactMode> LOOKUP_COMPACT =
+            key("lookup-compact")
+                    .enumType(LookupCompactMode.class)
+                    .defaultValue(LookupCompactMode.RADICAL)
+                    .withDescription("Lookup compact mode used for lookup 
compaction.");
+
+    public static final ConfigOption<Integer> LOOKUP_COMPACT_MAX_INTERVAL =
+            key("lookup-compact.max-interval")
+                    .intType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The max interval for a gentle mode lookup 
compaction to be triggered. For every interval, "
+                                    + "a forced lookup compaction will be 
performed to flush L0 files to higher level. "
+                                    + "This option is only valid when 
lookup-compact mode is gentle.");
+
     public static final ConfigOption<Integer> DELETE_FILE_THREAD_NUM =
             key("delete-file.thread-num")
                     .intType()
@@ -2524,6 +2539,23 @@ public class CoreOptions implements Serializable {
         return options.get(LOOKUP_WAIT);
     }
 
+    public boolean statefulLookup() {
+        return needLookup()
+                && (!options.get(LOOKUP_WAIT) || 
LookupCompactMode.GENTLE.equals(lookupCompact()));
+    }
+
+    public LookupCompactMode lookupCompact() {
+        return options.get(LOOKUP_COMPACT);
+    }
+
+    public int lookupCompactMaxInterval() {
+        Integer maxInterval = options.get(LOOKUP_COMPACT_MAX_INTERVAL);
+        if (maxInterval == null) {
+            maxInterval = 
MathUtils.multiplySafely(numSortedRunCompactionTrigger(), 2);
+        }
+        return Math.max(numSortedRunCompactionTrigger(), maxInterval);
+    }
+
     public boolean asyncFileWrite() {
         return options.get(ASYNC_FILE_WRITE);
     }
@@ -3285,4 +3317,16 @@ public class CoreOptions implements Serializable {
             throw new IllegalArgumentException("cannot match type: " + 
orderType + " for ordering");
         }
     }
+
+    /** The compact mode for lookup compaction. */
+    public enum LookupCompactMode {
+        /**
+         * Lookup compaction will use ForceUpLevel0Compaction strategy to 
radically compact new
+         * files.
+         */
+        RADICAL,
+
+        /** Lookup compaction will use UniversalCompaction strategy to gently 
compact new files. */
+        GENTLE
+    }
 }
diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/MathUtils.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/MathUtils.java
index ba9994d9b1..bc9103a56d 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/MathUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/MathUtils.java
@@ -110,4 +110,15 @@ public class MathUtils {
             return Integer.MAX_VALUE;
         }
     }
+
+    /**
+     * Safely multiply the given int value by another int value, ensuring that 
no overflow occurs.
+     */
+    public static int multiplySafely(int a, int b) {
+        try {
+            return Math.multiplyExact(a, b);
+        } catch (ArithmeticException e) {
+            return Integer.MAX_VALUE;
+        }
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/compact/CompactManager.java 
b/paimon-core/src/main/java/org/apache/paimon/compact/CompactManager.java
index 88897bd860..405e25b052 100644
--- a/paimon-core/src/main/java/org/apache/paimon/compact/CompactManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/compact/CompactManager.java
@@ -52,6 +52,9 @@ public interface CompactManager extends Closeable {
     /** Cancel currently running compaction task. */
     void cancelCompaction();
 
-    /** Check if a compaction is in progress, or if a compaction result 
remains to be fetched. */
+    /**
+     * Check if a compaction is in progress, or if a compaction result remains 
to be fetched, or if
+     * a compaction should be triggered later.
+     */
     boolean isCompacting();
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
index 1c805e764a..18e009d185 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
@@ -142,7 +142,7 @@ public class MergeTreeWriter implements 
RecordWriter<KeyValue>, MemoryOwner {
     }
 
     @VisibleForTesting
-    CompactManager compactManager() {
+    public CompactManager compactManager() {
         return compactManager;
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ForceUpLevel0Compaction.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ForceUpLevel0Compaction.java
index d3ec39cb67..350c5fb056 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ForceUpLevel0Compaction.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ForceUpLevel0Compaction.java
@@ -40,18 +40,6 @@ public class ForceUpLevel0Compaction implements 
CompactStrategy {
             return pick;
         }
 
-        // collect all level 0 files
-        int candidateCount = 0;
-        for (int i = candidateCount; i < runs.size(); i++) {
-            if (runs.get(i).level() > 0) {
-                break;
-            }
-            candidateCount++;
-        }
-
-        return candidateCount == 0
-                ? Optional.empty()
-                : Optional.of(
-                        universal.pickForSizeRatio(numLevels - 1, runs, 
candidateCount, true));
+        return universal.forcePickL0(numLevels, runs);
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
index 15629f2e66..a08592eee2 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
@@ -63,6 +63,7 @@ public class MergeTreeCompactManager extends 
CompactFutureManager {
     @Nullable private final CompactionMetrics.Reporter metricsReporter;
     @Nullable private final DeletionVectorsMaintainer dvMaintainer;
     private final boolean lazyGenDeletionFile;
+    private final boolean needLookup;
 
     public MergeTreeCompactManager(
             ExecutorService executor,
@@ -74,7 +75,8 @@ public class MergeTreeCompactManager extends 
CompactFutureManager {
             CompactRewriter rewriter,
             @Nullable CompactionMetrics.Reporter metricsReporter,
             @Nullable DeletionVectorsMaintainer dvMaintainer,
-            boolean lazyGenDeletionFile) {
+            boolean lazyGenDeletionFile,
+            boolean needLookup) {
         this.executor = executor;
         this.levels = levels;
         this.strategy = strategy;
@@ -85,6 +87,7 @@ public class MergeTreeCompactManager extends 
CompactFutureManager {
         this.metricsReporter = metricsReporter;
         this.dvMaintainer = dvMaintainer;
         this.lazyGenDeletionFile = lazyGenDeletionFile;
+        this.needLookup = needLookup;
 
         MetricUtils.safeCall(this::reportMetrics, LOG);
     }
@@ -240,6 +243,11 @@ public class MergeTreeCompactManager extends 
CompactFutureManager {
         return result;
     }
 
+    @Override
+    public boolean isCompacting() {
+        return super.isCompacting() || (needLookup && 
!levels().level0().isEmpty());
+    }
+
     private void reportMetrics() {
         if (metricsReporter != null) {
             metricsReporter.reportLevel0FileCount(levels.level0().size());
@@ -254,4 +262,9 @@ public class MergeTreeCompactManager extends 
CompactFutureManager {
             MetricUtils.safeCall(metricsReporter::unregister, LOG);
         }
     }
+
+    @VisibleForTesting
+    public CompactStrategy getStrategy() {
+        return strategy;
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/UniversalCompaction.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/UniversalCompaction.java
index c31aec682b..056c852e59 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/UniversalCompaction.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/UniversalCompaction.java
@@ -31,6 +31,7 @@ import javax.annotation.Nullable;
 import java.time.Duration;
 import java.util.List;
 import java.util.Optional;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * Universal Compaction Style is a compaction style, targeting the use cases 
requiring lower write
@@ -50,6 +51,9 @@ public class UniversalCompaction implements CompactStrategy {
     @Nullable private final Long opCompactionInterval;
     @Nullable private Long lastOptimizedCompaction;
 
+    @Nullable private final Integer maxLookupCompactInterval;
+    @Nullable private final AtomicInteger lookupCompactTriggerCount;
+
     public UniversalCompaction(int maxSizeAmp, int sizeRatio, int 
numRunCompactionTrigger) {
         this(maxSizeAmp, sizeRatio, numRunCompactionTrigger, null);
     }
@@ -59,11 +63,22 @@ public class UniversalCompaction implements CompactStrategy 
{
             int sizeRatio,
             int numRunCompactionTrigger,
             @Nullable Duration opCompactionInterval) {
+        this(maxSizeAmp, sizeRatio, numRunCompactionTrigger, 
opCompactionInterval, null);
+    }
+
+    public UniversalCompaction(
+            int maxSizeAmp,
+            int sizeRatio,
+            int numRunCompactionTrigger,
+            @Nullable Duration opCompactionInterval,
+            @Nullable Integer maxLookupCompactInterval) {
         this.maxSizeAmp = maxSizeAmp;
         this.sizeRatio = sizeRatio;
         this.numRunCompactionTrigger = numRunCompactionTrigger;
         this.opCompactionInterval =
                 opCompactionInterval == null ? null : 
opCompactionInterval.toMillis();
+        this.maxLookupCompactInterval = maxLookupCompactInterval;
+        this.lookupCompactTriggerCount = new AtomicInteger(0);
     }
 
     @Override
@@ -107,9 +122,44 @@ public class UniversalCompaction implements 
CompactStrategy {
             return Optional.ofNullable(pickForSizeRatio(maxLevel, runs, 
candidateCount));
         }
 
+        // 4 checking if a forced L0 compact should be triggered
+        if (maxLookupCompactInterval != null && lookupCompactTriggerCount != 
null) {
+            lookupCompactTriggerCount.getAndIncrement();
+            if 
(lookupCompactTriggerCount.compareAndSet(maxLookupCompactInterval, 0)) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug(
+                            "Universal compaction due to max lookup compaction 
interval {}.",
+                            maxLookupCompactInterval);
+                }
+                return forcePickL0(numLevels, runs);
+            } else {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug(
+                            "Skip universal compaction due to lookup 
compaction trigger count {} is less than the max interval {}.",
+                            lookupCompactTriggerCount.get(),
+                            maxLookupCompactInterval);
+                }
+            }
+        }
+
         return Optional.empty();
     }
 
+    Optional<CompactUnit> forcePickL0(int numLevels, List<LevelSortedRun> 
runs) {
+        // collect all level 0 files
+        int candidateCount = 0;
+        for (int i = candidateCount; i < runs.size(); i++) {
+            if (runs.get(i).level() > 0) {
+                break;
+            }
+            candidateCount++;
+        }
+
+        return candidateCount == 0
+                ? Optional.empty()
+                : Optional.of(pickForSizeRatio(numLevels - 1, runs, 
candidateCount, true));
+    }
+
     @VisibleForTesting
     CompactUnit pickForSizeAmp(int maxLevel, List<LevelSortedRun> runs) {
         if (runs.size() < numRunCompactionTrigger) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
index 9b7c1a8fba..dcf460cd5a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
@@ -290,6 +290,8 @@ public abstract class AbstractFileStoreWrite<T> implements 
FileStoreWrite<T> {
         //
         // Condition 2: No compaction is in progress. That is, no more 
changelog will be
         // produced.
+        //
+        // Condition 3: The writer has no postponed compaction like gentle 
lookup compaction.
         return writerContainer ->
                 writerContainer.lastModifiedCommitIdentifier < 
latestCommittedIdentifier
                         && !writerContainer.writer.isCompacting();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index 0d2f824282..a851c92ba4 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -196,16 +196,7 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
                 writerFactoryBuilder.build(partition, bucket, options);
         Comparator<InternalRow> keyComparator = keyComparatorSupplier.get();
         Levels levels = new Levels(keyComparator, restoreFiles, 
options.numLevels());
-        UniversalCompaction universalCompaction =
-                new UniversalCompaction(
-                        options.maxSizeAmplificationPercent(),
-                        options.sortedRunSizeRatio(),
-                        options.numSortedRunCompactionTrigger(),
-                        options.optimizedCompactionInterval());
-        CompactStrategy compactStrategy =
-                options.needLookup()
-                        ? new ForceUpLevel0Compaction(universalCompaction)
-                        : universalCompaction;
+        CompactStrategy compactStrategy = createCompactStrategy(options);
         CompactManager compactManager =
                 createCompactManager(
                         partition, bucket, compactStrategy, compactExecutor, 
levels, dvMaintainer);
@@ -232,6 +223,32 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
         return options.writeBufferSpillable(fileIO.isObjectStore(), 
isStreamingMode, true);
     }
 
+    private CompactStrategy createCompactStrategy(CoreOptions options) {
+        if (options.needLookup()) {
+            if 
(CoreOptions.LookupCompactMode.RADICAL.equals(options.lookupCompact())) {
+                return new ForceUpLevel0Compaction(
+                        new UniversalCompaction(
+                                options.maxSizeAmplificationPercent(),
+                                options.sortedRunSizeRatio(),
+                                options.numSortedRunCompactionTrigger(),
+                                options.optimizedCompactionInterval()));
+            } else if 
(CoreOptions.LookupCompactMode.GENTLE.equals(options.lookupCompact())) {
+                return new UniversalCompaction(
+                        options.maxSizeAmplificationPercent(),
+                        options.sortedRunSizeRatio(),
+                        options.numSortedRunCompactionTrigger(),
+                        options.optimizedCompactionInterval(),
+                        options.lookupCompactMaxInterval());
+            }
+        }
+
+        return new UniversalCompaction(
+                options.maxSizeAmplificationPercent(),
+                options.sortedRunSizeRatio(),
+                options.numSortedRunCompactionTrigger(),
+                options.optimizedCompactionInterval());
+    }
+
     private CompactManager createCompactManager(
             BinaryRow partition,
             int bucket,
@@ -264,7 +281,8 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
                             ? null
                             : compactionMetrics.createReporter(partition, 
bucket),
                     dvMaintainer,
-                    options.prepareCommitWaitCompaction());
+                    options.prepareCommitWaitCompaction(),
+                    options.needLookup());
         }
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/RecordWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/RecordWriter.java
index 2ccf4cc861..7955c85a73 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/RecordWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/RecordWriter.java
@@ -64,7 +64,10 @@ public interface RecordWriter<T> {
      */
     CommitIncrement prepareCommit(boolean waitCompaction) throws Exception;
 
-    /** Check if a compaction is in progress, or if a compaction result 
remains to be fetched. */
+    /**
+     * Check if a compaction is in progress, or if a compaction result remains 
to be fetched, or if
+     * a compaction should be triggered later.
+     */
     boolean isCompacting();
 
     /**
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
index e987e2ee99..01a9326aed 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
@@ -540,7 +540,8 @@ public abstract class MergeTreeTestBase {
                 new TestRewriter(),
                 null,
                 null,
-                false);
+                false,
+                options.needLookup());
     }
 
     static class MockFailResultCompactionManager extends 
MergeTreeCompactManager {
@@ -562,6 +563,7 @@ public abstract class MergeTreeTestBase {
                     rewriter,
                     null,
                     null,
+                    false,
                     false);
         }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerTest.java
index 8484525c82..8e55ac2bef 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerTest.java
@@ -179,6 +179,54 @@ public class MergeTreeCompactManagerTest {
                 Collections.singletonList(new LevelMinMax(2, 1, 10)));
     }
 
+    @Test
+    public void testIsCompacting() {
+        List<LevelMinMax> inputs =
+                Arrays.asList(
+                        new LevelMinMax(0, 1, 3),
+                        new LevelMinMax(1, 1, 5),
+                        new LevelMinMax(1, 6, 7));
+        List<DataFileMeta> files = new ArrayList<>();
+
+        for (int i = 0; i < inputs.size(); i++) {
+            LevelMinMax minMax = inputs.get(i);
+            files.add(minMax.toFile(i));
+        }
+
+        Levels levels = new Levels(comparator, files, 3);
+
+        MergeTreeCompactManager lookupManager =
+                new MergeTreeCompactManager(
+                        service,
+                        levels,
+                        testStrategy(),
+                        comparator,
+                        2,
+                        Integer.MAX_VALUE,
+                        new TestRewriter(true),
+                        null,
+                        null,
+                        false,
+                        true);
+
+        MergeTreeCompactManager defaultManager =
+                new MergeTreeCompactManager(
+                        service,
+                        levels,
+                        testStrategy(),
+                        comparator,
+                        2,
+                        Integer.MAX_VALUE,
+                        new TestRewriter(true),
+                        null,
+                        null,
+                        false,
+                        false);
+
+        assertThat(lookupManager.isCompacting()).isTrue();
+        assertThat(defaultManager.isCompacting()).isFalse();
+    }
+
     private void innerTest(List<LevelMinMax> inputs, List<LevelMinMax> 
expected)
             throws ExecutionException, InterruptedException {
         innerTest(inputs, expected, testStrategy(), true);
@@ -207,6 +255,7 @@ public class MergeTreeCompactManagerTest {
                         new TestRewriter(expectedDropDelete),
                         null,
                         null,
+                        false,
                         false);
         manager.triggerCompaction(false);
         manager.getCompactionResult(true);
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java
index 793aea7598..b5587e4752 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java
@@ -292,6 +292,68 @@ public class UniversalCompactionTest {
         assertThat(pick.get().outputLevel()).isEqualTo(1);
     }
 
+    @Test
+    public void testForcePickL0() {
+        int maxInterval = 5;
+        UniversalCompaction compaction = new UniversalCompaction(25, 1, 5, 
null, maxInterval);
+
+        // level 0 to max level
+        List<LevelSortedRun> level0ToMax = level0(1, 2, 2, 2);
+        Optional<CompactUnit> pick;
+        long[] results;
+
+        for (int i = 1; i <= maxInterval; i++) {
+            if (i == maxInterval) {
+                // level 0 to max level triggered
+                pick = compaction.pick(3, level0ToMax);
+                assertThat(pick.isPresent()).isTrue();
+                results = 
pick.get().files().stream().mapToLong(DataFileMeta::fileSize).toArray();
+                assertThat(results).isEqualTo(new long[] {1, 2, 2, 2});
+                assertThat(pick.get().outputLevel()).isEqualTo(2);
+            } else {
+                // compact skipped
+                pick = compaction.pick(3, level0ToMax);
+                assertThat(pick.isPresent()).isFalse();
+            }
+        }
+
+        // level 0 force pick
+        List<LevelSortedRun> level0ForcePick = Arrays.asList(level(0, 2), 
level(1, 2), level(2, 2));
+
+        for (int i = 1; i <= maxInterval; i++) {
+            if (i == maxInterval) {
+                // level 0 force pick triggered
+                pick = compaction.pick(3, level0ForcePick);
+                assertThat(pick.isPresent()).isTrue();
+                results = 
pick.get().files().stream().mapToLong(DataFileMeta::fileSize).toArray();
+                assertThat(results).isEqualTo(new long[] {2, 2, 2});
+                assertThat(pick.get().outputLevel()).isEqualTo(2);
+            } else {
+                // compact skipped
+                pick = compaction.pick(3, level0ForcePick);
+                assertThat(pick.isPresent()).isFalse();
+            }
+        }
+
+        // level 0 to empty level
+        List<LevelSortedRun> level0ToEmpty = Arrays.asList(level(0, 1), 
level(2, 2));
+
+        for (int i = 1; i <= maxInterval; i++) {
+            if (i == maxInterval) {
+                // level 0 to empty level triggered
+                pick = compaction.pick(3, level0ToEmpty);
+                assertThat(pick.isPresent()).isTrue();
+                results = 
pick.get().files().stream().mapToLong(DataFileMeta::fileSize).toArray();
+                assertThat(results).isEqualTo(new long[] {1});
+                assertThat(pick.get().outputLevel()).isEqualTo(1);
+            } else {
+                // compact skipped
+                pick = compaction.pick(3, level0ToEmpty);
+                assertThat(pick.isPresent()).isFalse();
+            }
+        }
+    }
+
     private List<LevelSortedRun> createLevels(int... levels) {
         List<LevelSortedRun> runs = new ArrayList<>();
         for (int size : levels) {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreWriteTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreWriteTest.java
new file mode 100644
index 0000000000..0098b44b91
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreWriteTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.KeyValue;
+import org.apache.paimon.TestFileStore;
+import org.apache.paimon.TestKeyValueGenerator;
+import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.disk.IOManagerImpl;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.mergetree.MergeTreeWriter;
+import org.apache.paimon.mergetree.compact.CompactStrategy;
+import org.apache.paimon.mergetree.compact.DeduplicateMergeFunction;
+import org.apache.paimon.mergetree.compact.ForceUpLevel0Compaction;
+import org.apache.paimon.mergetree.compact.MergeTreeCompactManager;
+import org.apache.paimon.mergetree.compact.UniversalCompaction;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link KeyValueFileStoreWrite}. */
+public class KeyValueFileStoreWriteTest {
+
+    private static final int NUM_BUCKETS = 10;
+
+    @TempDir java.nio.file.Path tempDir;
+
+    private IOManager ioManager;
+
+    @BeforeEach
+    public void before() throws IOException {
+        this.ioManager = new IOManagerImpl(tempDir.toString());
+    }
+
+    @Test
+    public void testRadicalLookupCompactStrategy() throws Exception {
+        Map<String, String> options = new HashMap<>();
+        options.put(CoreOptions.DELETION_VECTORS_ENABLED.key(), "true");
+        options.put(CoreOptions.LOOKUP_COMPACT.key(), "radical");
+
+        KeyValueFileStoreWrite write = createWriteWithOptions(options);
+        write.withIOManager(ioManager);
+        TestKeyValueGenerator gen = new TestKeyValueGenerator();
+
+        KeyValue keyValue = gen.next();
+        AbstractFileStoreWrite.WriterContainer<KeyValue> writerContainer =
+                write.createWriterContainer(gen.getPartition(keyValue), 1, 
false);
+        MergeTreeWriter writer = (MergeTreeWriter) writerContainer.writer;
+        try (MergeTreeCompactManager compactManager =
+                (MergeTreeCompactManager) writer.compactManager()) {
+            CompactStrategy compactStrategy = compactManager.getStrategy();
+            
assertThat(compactStrategy).isInstanceOf(ForceUpLevel0Compaction.class);
+        }
+    }
+
+    @Test
+    public void testGentleLookupCompactStrategy() throws Exception {
+        Map<String, String> options = new HashMap<>();
+        options.put(CoreOptions.DELETION_VECTORS_ENABLED.key(), "true");
+        options.put(CoreOptions.LOOKUP_COMPACT.key(), "gentle");
+
+        KeyValueFileStoreWrite write = createWriteWithOptions(options);
+        write.withIOManager(ioManager);
+        TestKeyValueGenerator gen = new TestKeyValueGenerator();
+
+        KeyValue keyValue = gen.next();
+        AbstractFileStoreWrite.WriterContainer<KeyValue> writerContainer =
+                write.createWriterContainer(gen.getPartition(keyValue), 1, 
false);
+        MergeTreeWriter writer = (MergeTreeWriter) writerContainer.writer;
+        try (MergeTreeCompactManager compactManager =
+                (MergeTreeCompactManager) writer.compactManager()) {
+            CompactStrategy compactStrategy = compactManager.getStrategy();
+            
assertThat(compactStrategy).isInstanceOf(UniversalCompaction.class);
+        }
+    }
+
+    private KeyValueFileStoreWrite createWriteWithOptions(Map<String, String> 
options)
+            throws Exception {
+        SchemaManager schemaManager =
+                new SchemaManager(LocalFileIO.create(), new 
Path(tempDir.toUri()));
+
+        TableSchema schema =
+                schemaManager.createTable(
+                        new Schema(
+                                
TestKeyValueGenerator.DEFAULT_ROW_TYPE.getFields(),
+                                
TestKeyValueGenerator.DEFAULT_PART_TYPE.getFieldNames(),
+                                TestKeyValueGenerator.getPrimaryKeys(
+                                        
TestKeyValueGenerator.GeneratorMode.MULTI_PARTITIONED),
+                                options,
+                                null));
+        TestFileStore store =
+                new TestFileStore.Builder(
+                                "avro",
+                                tempDir.toString(),
+                                NUM_BUCKETS,
+                                TestKeyValueGenerator.DEFAULT_PART_TYPE,
+                                TestKeyValueGenerator.KEY_TYPE,
+                                TestKeyValueGenerator.DEFAULT_ROW_TYPE,
+                                
TestKeyValueGenerator.TestKeyValueFieldsExtractor.EXTRACTOR,
+                                DeduplicateMergeFunction.factory(),
+                                schema)
+                        .build();
+
+        return (KeyValueFileStoreWrite) store.newWrite();
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
index 4c56b13e50..5de819348f 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
@@ -141,7 +141,7 @@ public abstract class FlinkSink<T> implements Serializable {
             }
         }
 
-        if (coreOptions.needLookup() && 
!coreOptions.prepareCommitWaitCompaction()) {
+        if (coreOptions.statefulLookup()) {
             return (table, commitUser, state, ioManager, memoryPool, 
metricGroup) -> {
                 assertNoSinkMaterializer.run();
                 return new AsyncLookupSinkWrite(
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java
index 2e55fc919b..007d4a59ce 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java
@@ -243,6 +243,130 @@ public class WriterOperatorTest {
                 .containsExactlyInAnyOrder("+I[1, 10, 101]", "+I[2, 20, 200]", 
"+I[3, 30, 301]");
     }
 
+    @Test
+    public void testGentleLookupWithFailure() throws Exception {
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {DataTypes.INT(), DataTypes.INT(), 
DataTypes.INT()},
+                        new String[] {"pt", "k", "v"});
+
+        int lookupCompactMaxInterval = 5;
+
+        Options options = new Options();
+        options.set("bucket", "1");
+        options.set("changelog-producer", "lookup");
+        options.set(CoreOptions.LOOKUP_COMPACT, 
CoreOptions.LookupCompactMode.GENTLE);
+        options.set(CoreOptions.LOOKUP_COMPACT_MAX_INTERVAL, 
lookupCompactMaxInterval);
+
+        FileStoreTable fileStoreTable =
+                createFileStoreTable(
+                        rowType, Arrays.asList("pt", "k"), 
Collections.singletonList("k"), options);
+
+        RowDataStoreWriteOperator.Factory operatorFactory =
+                getAsyncLookupWriteOperatorFactory(fileStoreTable, false);
+        OneInputStreamOperatorTestHarness<InternalRow, Committable> harness =
+                createHarness(operatorFactory);
+
+        TableCommitImpl commit = fileStoreTable.newCommit(commitUser);
+
+        TypeSerializer<Committable> serializer =
+                new CommittableTypeInfo().createSerializer(new 
ExecutionConfig());
+        harness.setup(serializer);
+        harness.open();
+
+        // write basic records
+        harness.processElement(GenericRow.of(1, 10, 100), 1);
+        harness.processElement(GenericRow.of(2, 20, 200), 2);
+        harness.processElement(GenericRow.of(3, 30, 300), 3);
+        harness.prepareSnapshotPreBarrier(1);
+        harness.snapshot(1, 10);
+        harness.notifyOfCompletedCheckpoint(1);
+        commitAll(harness, commit, 1);
+
+        harness.processElement(GenericRow.of(1, 10, 101), 11);
+        harness.processElement(GenericRow.of(3, 30, 301), 13);
+        harness.prepareSnapshotPreBarrier(2);
+        OperatorSubtaskState state = harness.snapshot(2, 20);
+        harness.notifyOfCompletedCheckpoint(2);
+        commitAll(harness, commit, 2);
+
+        // operator is closed due to failure
+        harness.close();
+
+        // restore operator to trigger gentle lookup compaction
+        operatorFactory = getAsyncLookupWriteOperatorFactory(fileStoreTable, 
true);
+        harness = createHarness(operatorFactory);
+        harness.setup(serializer);
+        harness.initializeState(state);
+        harness.open();
+
+        // write nothing, wait for compaction
+        harness.prepareSnapshotPreBarrier(3);
+        harness.snapshot(3, 30);
+        harness.notifyOfCompletedCheckpoint(3);
+        commitAll(harness, commit, 3);
+
+        harness.close();
+
+        // check gentle lookup compaction result, no data available
+        ReadBuilder readBuilder = fileStoreTable.newReadBuilder();
+        StreamTableScan scan = readBuilder.newStreamScan();
+        List<Split> splits = scan.plan().splits();
+        TableRead read = readBuilder.newRead();
+        RecordReader<InternalRow> reader = read.createReader(splits);
+        List<String> actual = new ArrayList<>();
+        reader.forEachRemaining(
+                row ->
+                        actual.add(
+                                String.format(
+                                        "%s[%d, %d, %d]",
+                                        row.getRowKind().shortString(),
+                                        row.getInt(0),
+                                        row.getInt(1),
+                                        row.getInt(2))));
+        assertThat(actual).isEmpty();
+
+        // restore operator to force trigger gentle lookup compaction
+        operatorFactory = getAsyncLookupWriteOperatorFactory(fileStoreTable, 
true);
+        harness = createHarness(operatorFactory);
+        harness.setup(serializer);
+        harness.initializeState(state);
+        harness.open();
+
+        // force trigger gentle lookup compaction by max interval
+        // start count from 1, since the first lookup compact will be 
triggered when initialize
+        // AsyncLookupWriteOperator
+        for (int i = 1; i < lookupCompactMaxInterval; i++) {
+            long checkpointId = i + 3;
+            harness.prepareSnapshotPreBarrier(checkpointId);
+            harness.snapshot(checkpointId, i + 30);
+            harness.notifyOfCompletedCheckpoint(checkpointId);
+            commitAll(harness, commit, checkpointId);
+        }
+
+        harness.close();
+        commit.close();
+
+        // check all partition result
+        readBuilder = fileStoreTable.newReadBuilder();
+        scan = readBuilder.newStreamScan();
+        splits = scan.plan().splits();
+        read = readBuilder.newRead();
+        reader = read.createReader(splits);
+        List<String> finalResult = new ArrayList<>();
+        reader.forEachRemaining(
+                row ->
+                        finalResult.add(
+                                String.format(
+                                        "%s[%d, %d, %d]",
+                                        row.getRowKind().shortString(),
+                                        row.getInt(0),
+                                        row.getInt(1),
+                                        row.getInt(2))));
+        assertThat(finalResult)
+                .containsExactlyInAnyOrder("+I[1, 10, 101]", "+I[2, 20, 200]", 
"+I[3, 30, 301]");
+    }
+
     @Test
     public void testChangelog() throws Exception {
         testChangelog(false);


Reply via email to