This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new b110f6ed0a [core] Introduce SimpleLsmKvDb to put and get based on 
Paimon SST (#7407)
b110f6ed0a is described below

commit b110f6ed0abb24ade24129a744d9455791328233
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu Mar 12 14:55:01 2026 +0800

    [core] Introduce SimpleLsmKvDb to put and get based on Paimon SST (#7407)
---
 .../apache/paimon/lookup/sort/db/LsmCompactor.java |  733 +++++++++++
 .../paimon/lookup/sort/db/SimpleLsmKvDb.java       |  573 ++++++++
 .../paimon/lookup/sort/db/SstFileMetadata.java     |   90 ++
 .../java/org/apache/paimon/sst/SstFileWriter.java  |    8 +-
 .../paimon/lookup/sort/db/SimpleLsmKvDbTest.java   | 1363 ++++++++++++++++++++
 5 files changed, 2764 insertions(+), 3 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/db/LsmCompactor.java
 
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/db/LsmCompactor.java
new file mode 100644
index 0000000000..64e9cff92a
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/db/LsmCompactor.java
@@ -0,0 +1,733 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lookup.sort.db;
+
+import org.apache.paimon.lookup.sort.SortLookupStoreFactory;
+import org.apache.paimon.lookup.sort.SortLookupStoreReader;
+import org.apache.paimon.lookup.sort.SortLookupStoreWriter;
+import org.apache.paimon.memory.MemorySlice;
+import org.apache.paimon.sst.BlockIterator;
+import org.apache.paimon.sst.SstFileReader;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Set;
+
+import static org.apache.paimon.lookup.sort.db.SimpleLsmKvDb.isTombstone;
+
+/**
+ * Handles Universal Compaction for the LSM-Tree, inspired by RocksDB's 
Universal Compaction.
+ *
+ * <p>Universal Compaction treats all SST files as a flat list of sorted runs 
ordered from newest to
+ * oldest. Instead of compacting level-by-level, it picks contiguous runs to 
merge based on size
+ * ratios between adjacent runs.
+ *
+ * <p>Compaction trigger: when the number of level-0 files reaches {@code
+ * level0FileNumCompactionTrigger}.
+ *
+ * <p>Run selection (from newest to oldest):
+ *
+ * <ol>
+ *   <li><b>Size-ratio based</b>: Starting from the newest run, accumulate 
runs as long as {@code
+ *       accumulatedSize / nextRunSize < sizeRatio%}. If at least 2 runs are 
accumulated, merge
+ *       them.
+ *   <li><b>Fallback</b>: If size-ratio selection finds fewer than 2 
candidates, merge all runs.
+ * </ol>
+ *
+ * <p>Tombstones (empty values) are only removed when all runs are being 
merged (equivalent to
+ * compacting to the bottom level).
+ */
+public class LsmCompactor {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(LsmCompactor.class);
+
+    private final Comparator<MemorySlice> keyComparator;
+    private final SortLookupStoreFactory storeFactory;
+    private final long maxOutputFileSize;
+    private final int level0FileNumCompactTrigger;
+    private final int sizeRatioPercent;
+    private final FileDeleter fileDeleter;
+
+    public LsmCompactor(
+            Comparator<MemorySlice> keyComparator,
+            SortLookupStoreFactory storeFactory,
+            long maxOutputFileSize,
+            int level0FileNumCompactTrigger,
+            int sizeRatioPercent,
+            FileDeleter fileDeleter) {
+        this.keyComparator = keyComparator;
+        this.storeFactory = storeFactory;
+        this.maxOutputFileSize = maxOutputFileSize;
+        this.level0FileNumCompactTrigger = level0FileNumCompactTrigger;
+        this.sizeRatioPercent = sizeRatioPercent;
+        this.fileDeleter = fileDeleter;
+    }
+
+    /**
+     * Check if compaction is needed and perform it if so.
+     *
+     * <p>All sorted runs are collected from all levels into a flat list 
ordered newest-first. If
+     * the number of level-0 files reaches {@code 
level0FileNumCompactionTrigger}, a compaction is
+     * triggered.
+     *
+     * <p>When the size-ratio branch is chosen, the candidate range is 
guaranteed to include all
+     * level-0 runs and the level-1 run (if present). This ensures level 0 is 
fully cleared after
+     * compaction and the merged result can be placed into level 1 without 
overflow.
+     *
+     * @param levels the multi-level SST file storage (used as a flat 
sorted-run list)
+     * @param maxLevels the maximum number of levels (only level 0 is used for 
sorted runs)
+     * @param fileSupplier supplier for new SST file paths
+     */
+    public void maybeCompact(
+            List<List<SstFileMetadata>> levels, int maxLevels, FileSupplier 
fileSupplier)
+            throws IOException {
+        int levelZeroFileCount = levels.get(0).size();
+        if (levelZeroFileCount < level0FileNumCompactTrigger) {
+            return;
+        }
+
+        List<SortedRun> sortedRuns = collectSortedRuns(levels, maxLevels);
+
+        LOG.info(
+                "Universal compaction triggered: {} L0 files (threshold: {}), 
{} total sorted runs",
+                levelZeroFileCount,
+                level0FileNumCompactTrigger,
+                sortedRuns.size());
+
+        // Try size-ratio based selection: accumulate runs from newest (index 
0) to oldest
+        int candidateEnd = pickSizeRatioCandidates(sortedRuns);
+
+        // Ensure all L0 runs and the L1 run (if present) are included in the 
merge.
+        int minCandidateEnd = 0;
+        for (SortedRun run : sortedRuns) {
+            if (!run.files.isEmpty() && run.files.get(0).getLevel() <= 1) {
+                minCandidateEnd++;
+            } else {
+                break;
+            }
+        }
+        candidateEnd = Math.max(candidateEnd, minCandidateEnd);
+
+        if (candidateEnd >= 2) {
+            // Merge runs [0, candidateEnd)
+            LOG.info(
+                    "Size-ratio compaction: merging {} newest runs out of {}",
+                    candidateEnd,
+                    sortedRuns.size());
+            List<SortedRun> toMerge = new ArrayList<>(sortedRuns.subList(0, 
candidateEnd));
+            List<SortedRun> remaining =
+                    new ArrayList<>(sortedRuns.subList(candidateEnd, 
sortedRuns.size()));
+            boolean dropTombstones = remaining.isEmpty();
+
+            // Find the highest level not occupied by remaining runs
+            int outputLevel = findHighestFreeLevel(levels, maxLevels, toMerge);
+            MergeResult mergeResult =
+                    mergeSortedRuns(toMerge, dropTombstones, fileSupplier, 
outputLevel);
+
+            // Clear levels used by merged runs and place merged result
+            clearLevelsOfRuns(levels, toMerge);
+            levels.get(outputLevel).addAll(mergeResult.mergedRun.files);
+            deleteOldFiles(toMerge, mergeResult.skippedFiles);
+        } else {
+            // Fallback: merge all runs
+            LOG.info("Fallback full compaction: merging all {} runs", 
sortedRuns.size());
+            mergeAllRuns(levels, maxLevels, sortedRuns, fileSupplier);
+        }
+    }
+
+    /**
+     * Force a full compaction of all sorted runs into a single run.
+     *
+     * @param levels the multi-level SST file storage
+     * @param maxLevels the maximum number of levels
+     * @param fileSupplier supplier for new SST file paths
+     */
+    public void fullCompact(
+            List<List<SstFileMetadata>> levels, int maxLevels, FileSupplier 
fileSupplier)
+            throws IOException {
+        List<SortedRun> sortedRuns = collectSortedRuns(levels, maxLevels);
+        if (sortedRuns.size() <= 1) {
+            return;
+        }
+
+        LOG.info("Full compaction: merging all {} sorted runs", 
sortedRuns.size());
+        mergeAllRuns(levels, maxLevels, sortedRuns, fileSupplier);
+    }
+
+    /**
+     * Merge all sorted runs into a single run, placing the result at the 
highest level. Tombstones
+     * are dropped since there are no older runs below.
+     */
+    private void mergeAllRuns(
+            List<List<SstFileMetadata>> levels,
+            int maxLevels,
+            List<SortedRun> sortedRuns,
+            FileSupplier fileSupplier)
+            throws IOException {
+        int outputLevel = maxLevels - 1;
+        MergeResult mergeResult = mergeSortedRuns(sortedRuns, true, 
fileSupplier, outputLevel);
+
+        for (int i = 0; i < maxLevels; i++) {
+            levels.get(i).clear();
+        }
+        levels.get(outputLevel).addAll(mergeResult.mergedRun.files);
+        deleteOldFiles(sortedRuns, mergeResult.skippedFiles);
+    }
+
+    // 
-------------------------------------------------------------------------
+    //  Universal Compaction internals
+    // 
-------------------------------------------------------------------------
+
+    /**
+     * Collect all SST files into a flat list of sorted runs, ordered 
newest-first.
+     *
+     * <p>Level 0 files are each treated as an individual sorted run (keys may 
overlap). Level 1+
+     * files within the same level form a single sorted run (keys do not 
overlap within a level).
+     */
+    private List<SortedRun> collectSortedRuns(List<List<SstFileMetadata>> 
levels, int maxLevels) {
+        List<SortedRun> runs = new ArrayList<>();
+
+        // Level 0: each file is its own sorted run, newest first (L0 files 
are stored newest-first)
+        List<SstFileMetadata> levelZeroFiles = levels.get(0);
+        for (SstFileMetadata file : levelZeroFiles) {
+            List<SstFileMetadata> singleFile = new ArrayList<>();
+            singleFile.add(file);
+            runs.add(new SortedRun(singleFile));
+        }
+
+        // Level 1+: all files in a level form one sorted run
+        for (int level = 1; level < maxLevels; level++) {
+            List<SstFileMetadata> levelFiles = levels.get(level);
+            if (!levelFiles.isEmpty()) {
+                runs.add(new SortedRun(new ArrayList<>(levelFiles)));
+            }
+        }
+
+        return runs;
+    }
+
+    /**
+     * Pick candidates for size-ratio based compaction.
+     *
+     * <p>Starting from the newest run (index 0), accumulate runs as long as: 
{@code accumulatedSize
+     * / nextRunSize * 100 < sizeRatioPercent}.
+     *
+     * @return the number of runs to merge (from index 0). Returns 0 or 1 if 
no suitable candidates.
+     */
+    private int pickSizeRatioCandidates(List<SortedRun> sortedRuns) {
+        if (sortedRuns.size() < 2) {
+            return 0;
+        }
+
+        long accumulatedSize = sortedRuns.get(0).totalSize();
+        int candidateCount = 1;
+
+        for (int i = 1; i < sortedRuns.size(); i++) {
+            long nextRunSize = sortedRuns.get(i).totalSize();
+
+            // Check: accumulatedSize / nextRunSize * 100 < sizeRatioPercent
+            // Rewritten to avoid floating point: accumulatedSize * 100 < 
sizeRatioPercent *
+            // nextRunSize
+            if (accumulatedSize * 100 < (long) sizeRatioPercent * nextRunSize) 
{
+                accumulatedSize += nextRunSize;
+                candidateCount++;
+            } else {
+                break;
+            }
+        }
+
+        return candidateCount;
+    }
+
+    /**
+     * Merge multiple sorted runs into a single sorted run using a min-heap 
based multi-way merge.
+     *
+     * @param runsToMerge the sorted runs to merge (ordered newest-first)
+     * @param dropTombstones whether to drop tombstone entries
+     * @param fileSupplier supplier for new SST file paths
+     * @param outputLevel the level to assign to all output files
+     * @return the merged sorted run with output level set on all files
+     */
+    private MergeResult mergeSortedRuns(
+            List<SortedRun> runsToMerge,
+            boolean dropTombstones,
+            FileSupplier fileSupplier,
+            int outputLevel)
+            throws IOException {
+
+        List<SstFileMetadata> allFiles = new ArrayList<>();
+        for (SortedRun run : runsToMerge) {
+            allFiles.addAll(run.files);
+        }
+
+        long totalInputSize = 0;
+        for (SstFileMetadata meta : allFiles) {
+            totalInputSize += meta.getFileSize();
+        }
+        LOG.info(
+                "Starting merge: {} runs, {} total files, {} total bytes, 
dropTombstones={}",
+                runsToMerge.size(),
+                allFiles.size(),
+                totalInputSize,
+                dropTombstones);
+
+        long mergeStartTime = System.currentTimeMillis();
+
+        List<List<SstFileMetadata>> groups = groupFilesByKeyOverlap(allFiles);
+        List<List<SstFileMetadata>> mergedGroups = 
mergeSmallAdjacentGroups(groups);
+        Map<File, Integer> fileToRunSequence = 
buildFileToRunSequenceMap(runsToMerge);
+
+        List<SstFileMetadata> outputFiles = new ArrayList<>();
+        Set<File> skippedFileSet = new HashSet<>();
+        int skippedGroupCount = 0;
+        int mergedGroupCount = 0;
+
+        for (List<SstFileMetadata> group : mergedGroups) {
+            if (group.size() == 1) {
+                SstFileMetadata singleFile = group.get(0);
+                boolean canSkip = !dropTombstones || 
!singleFile.hasTombstones();
+                if (canSkip) {
+                    SstFileMetadata promoted = 
singleFile.withLevel(outputLevel);
+                    outputFiles.add(promoted);
+                    skippedFileSet.add(promoted.getFile());
+                    skippedGroupCount++;
+                    continue;
+                }
+            }
+
+            mergedGroupCount++;
+            List<SstFileMetadata> groupMerged =
+                    mergeFileGroup(
+                            group, dropTombstones, fileSupplier, 
fileToRunSequence, outputLevel);
+            outputFiles.addAll(groupMerged);
+        }
+
+        outputFiles.sort((a, b) -> keyComparator.compare(a.getMinKey(), 
b.getMinKey()));
+
+        long mergeElapsedMs = System.currentTimeMillis() - mergeStartTime;
+        long totalOutputSize = 0;
+        for (SstFileMetadata meta : outputFiles) {
+            totalOutputSize += meta.getFileSize();
+        }
+        LOG.info(
+                "Merge completed in {} ms: {} input files ({} bytes) -> {} 
output files "
+                        + "({} bytes), {} groups merged, {} groups skipped",
+                mergeElapsedMs,
+                allFiles.size(),
+                totalInputSize,
+                outputFiles.size(),
+                totalOutputSize,
+                mergedGroupCount,
+                skippedGroupCount);
+
+        return new MergeResult(new SortedRun(outputFiles), skippedFileSet, 
outputLevel);
+    }
+
+    /**
+     * Group files into connected components by key-range overlap using a 
sweep-line algorithm.
+     * Files are sorted by minKey, then adjacent files whose key ranges 
overlap are placed in the
+     * same group.
+     */
+    private List<List<SstFileMetadata>> 
groupFilesByKeyOverlap(List<SstFileMetadata> allFiles) {
+        List<SstFileMetadata> sortedFiles = new ArrayList<>(allFiles);
+        sortedFiles.sort((a, b) -> keyComparator.compare(a.getMinKey(), 
b.getMinKey()));
+
+        List<List<SstFileMetadata>> groups = new ArrayList<>();
+        List<SstFileMetadata> currentGroup = new ArrayList<>();
+        MemorySlice currentGroupMaxKey = null;
+
+        for (SstFileMetadata file : sortedFiles) {
+            if (currentGroup.isEmpty()) {
+                currentGroup.add(file);
+                currentGroupMaxKey = file.getMaxKey();
+            } else if (keyComparator.compare(file.getMinKey(), 
currentGroupMaxKey) <= 0) {
+                currentGroup.add(file);
+                if (keyComparator.compare(file.getMaxKey(), 
currentGroupMaxKey) > 0) {
+                    currentGroupMaxKey = file.getMaxKey();
+                }
+            } else {
+                groups.add(currentGroup);
+                currentGroup = new ArrayList<>();
+                currentGroup.add(file);
+                currentGroupMaxKey = file.getMaxKey();
+            }
+        }
+        if (!currentGroup.isEmpty()) {
+            groups.add(currentGroup);
+        }
+        return groups;
+    }
+
+    /**
+     * Merge small adjacent groups to avoid producing too many small files. If 
either the pending
+     * group or the current group is smaller than half of {@link 
#maxOutputFileSize}, they are
+     * combined into a single group.
+     */
+    private List<List<SstFileMetadata>> mergeSmallAdjacentGroups(
+            List<List<SstFileMetadata>> groups) {
+        long smallFileThreshold = maxOutputFileSize / 2;
+        List<List<SstFileMetadata>> mergedGroups = new ArrayList<>();
+        List<SstFileMetadata> pendingGroup = null;
+
+        for (List<SstFileMetadata> group : groups) {
+            if (pendingGroup == null) {
+                pendingGroup = new ArrayList<>(group);
+            } else {
+                long pendingSize = groupTotalSize(pendingGroup);
+                long groupSize = groupTotalSize(group);
+                if (pendingSize < smallFileThreshold || groupSize < 
smallFileThreshold) {
+                    pendingGroup.addAll(group);
+                } else {
+                    mergedGroups.add(pendingGroup);
+                    pendingGroup = new ArrayList<>(group);
+                }
+            }
+        }
+        if (pendingGroup != null) {
+            mergedGroups.add(pendingGroup);
+        }
+        return mergedGroups;
+    }
+
+    /**
+     * Build a mapping from each file to its run sequence number. Older runs 
receive lower sequence
+     * numbers so that during dedup the newest entry (highest sequence) wins.
+     */
+    private static Map<File, Integer> 
buildFileToRunSequenceMap(List<SortedRun> runsToMerge) {
+        Map<File, Integer> fileToRunSequence = new HashMap<>();
+        for (int runIdx = 0; runIdx < runsToMerge.size(); runIdx++) {
+            int sequence = runsToMerge.size() - 1 - runIdx;
+            for (SstFileMetadata meta : runsToMerge.get(runIdx).files) {
+                fileToRunSequence.put(meta.getFile(), sequence);
+            }
+        }
+        return fileToRunSequence;
+    }
+
+    private static long groupTotalSize(List<SstFileMetadata> group) {
+        long size = 0;
+        for (SstFileMetadata meta : group) {
+            size += meta.getFileSize();
+        }
+        return size;
+    }
+
+    /**
+     * Merge a group of files using min-heap based multi-way merge.
+     *
+     * @param group the files to merge (may be from different runs)
+     * @param dropTombstones whether to drop tombstone entries
+     * @param fileSupplier supplier for new SST file paths
+     * @param fileToRunSequence maps each file to its run sequence number for 
dedup ordering
+     * @param outputLevel the level to assign to output files
+     * @return the list of merged output files
+     */
+    private List<SstFileMetadata> mergeFileGroup(
+            List<SstFileMetadata> group,
+            boolean dropTombstones,
+            FileSupplier fileSupplier,
+            Map<File, Integer> fileToRunSequence,
+            int outputLevel)
+            throws IOException {
+
+        // Sort files by run sequence (older first) for correct dedup ordering
+        List<SstFileMetadata> orderedFiles = new ArrayList<>(group);
+        orderedFiles.sort(
+                (a, b) -> {
+                    int seqA = fileToRunSequence.getOrDefault(a.getFile(), 0);
+                    int seqB = fileToRunSequence.getOrDefault(b.getFile(), 0);
+                    return Integer.compare(seqA, seqB);
+                });
+
+        List<SstFileMetadata> result = new ArrayList<>();
+        List<SortLookupStoreReader> openReaders = new ArrayList<>();
+        PriorityQueue<MergeEntry> minHeap =
+                new PriorityQueue<>(
+                        (a, b) -> {
+                            int keyCompare = keyComparator.compare(a.key, 
b.key);
+                            if (keyCompare != 0) {
+                                return keyCompare;
+                            }
+                            return Integer.compare(b.sequence, a.sequence);
+                        });
+
+        SortLookupStoreWriter currentWriter = null;
+        try {
+            for (int seq = 0; seq < orderedFiles.size(); seq++) {
+                SortLookupStoreReader reader =
+                        
storeFactory.createReader(orderedFiles.get(seq).getFile());
+                openReaders.add(reader);
+                SstFileReader.SstFileIterator fileIterator = 
reader.createIterator();
+                MergeSource source = new MergeSource(fileIterator, seq);
+                if (source.advance()) {
+                    minHeap.add(source.currentEntry());
+                }
+            }
+            File currentSstFile = null;
+            MemorySlice currentFileMinKey = null;
+            MemorySlice currentFileMaxKey = null;
+            long currentBatchSize = 0;
+            long currentTombstoneCount = 0;
+            MemorySlice previousKey = null;
+
+            while (!minHeap.isEmpty()) {
+                MergeEntry entry = minHeap.poll();
+
+                if (previousKey != null && keyComparator.compare(entry.key, 
previousKey) == 0) {
+                    if (entry.source.advance()) {
+                        minHeap.add(entry.source.currentEntry());
+                    }
+                    continue;
+                }
+
+                previousKey = entry.key;
+
+                if (entry.source.advance()) {
+                    minHeap.add(entry.source.currentEntry());
+                }
+
+                if (dropTombstones && isTombstone(entry.value)) {
+                    continue;
+                }
+
+                if (currentWriter == null) {
+                    currentSstFile = fileSupplier.newSstFile();
+                    currentWriter = storeFactory.createWriter(currentSstFile, 
null);
+                    currentFileMinKey = entry.key;
+                    currentBatchSize = 0;
+                    currentTombstoneCount = 0;
+                }
+
+                currentWriter.put(entry.key.copyBytes(), entry.value);
+                currentFileMaxKey = entry.key;
+                currentBatchSize += entry.key.length() + entry.value.length;
+                if (isTombstone(entry.value)) {
+                    currentTombstoneCount++;
+                }
+
+                if (currentBatchSize >= maxOutputFileSize) {
+                    currentWriter.close();
+                    result.add(
+                            new SstFileMetadata(
+                                    currentSstFile,
+                                    currentFileMinKey,
+                                    currentFileMaxKey,
+                                    currentTombstoneCount,
+                                    outputLevel));
+                    currentWriter = null;
+                    currentSstFile = null;
+                    currentFileMinKey = null;
+                    currentFileMaxKey = null;
+                }
+            }
+
+            if (currentWriter != null) {
+                currentWriter.close();
+                result.add(
+                        new SstFileMetadata(
+                                currentSstFile,
+                                currentFileMinKey,
+                                currentFileMaxKey,
+                                currentTombstoneCount,
+                                outputLevel));
+            }
+        } catch (IOException | RuntimeException e) {
+            // Close the in-progress writer on failure to avoid resource leak
+            if (currentWriter != null) {
+                try {
+                    currentWriter.close();
+                } catch (IOException suppressed) {
+                    e.addSuppressed(suppressed);
+                }
+            }
+            throw e;
+        } finally {
+            for (SortLookupStoreReader reader : openReaders) {
+                reader.close();
+            }
+        }
+
+        return result;
+    }
+
+    /**
+     * Find the highest level that is either empty or occupied by one of the 
runs being merged.
+     * Starts from {@code maxLevels - 1} and works downward, stopping at level 
1 minimum (level 0 is
+     * reserved for new flushes).
+     */
+    private static int findHighestFreeLevel(
+            List<List<SstFileMetadata>> levels, int maxLevels, List<SortedRun> 
runsBeingMerged) {
+        Set<Integer> mergedLevels = new HashSet<>();
+        for (SortedRun run : runsBeingMerged) {
+            for (SstFileMetadata file : run.files) {
+                mergedLevels.add(file.getLevel());
+            }
+        }
+
+        // Start from the highest level and work down; a level is usable if it 
is empty or
+        // all its files belong to the runs being merged
+        for (int level = maxLevels - 1; level >= 1; level--) {
+            if (levels.get(level).isEmpty() || mergedLevels.contains(level)) {
+                return level;
+            }
+        }
+        return 1;
+    }
+
+    /** Clear the level lists for all levels that contain files from the given 
runs. */
+    private static void clearLevelsOfRuns(
+            List<List<SstFileMetadata>> levels, List<SortedRun> runs) {
+        Set<Integer> levelsToClear = new HashSet<>();
+        for (SortedRun run : runs) {
+            for (SstFileMetadata file : run.files) {
+                levelsToClear.add(file.getLevel());
+            }
+        }
+        for (int level : levelsToClear) {
+            if (level >= 0 && level < levels.size()) {
+                levels.get(level).clear();
+            }
+        }
+    }
+
+    /** Delete old SST files from the merged sorted runs, skipping files that 
were preserved. */
+    private void deleteOldFiles(List<SortedRun> oldRuns, Set<File> 
skippedFiles) {
+        for (SortedRun run : oldRuns) {
+            for (SstFileMetadata meta : run.files) {
+                if (!skippedFiles.contains(meta.getFile())) {
+                    fileDeleter.deleteFile(meta.getFile());
+                }
+            }
+        }
+    }
+
+    // 
-------------------------------------------------------------------------
+    //  Sorted Run
+    // 
-------------------------------------------------------------------------
+
+    /** Result of a merge operation, containing the merged run, skipped files 
and output level. */
+    private static final class MergeResult {
+        final SortedRun mergedRun;
+        final Set<File> skippedFiles;
+        final int outputLevel;
+
+        MergeResult(SortedRun mergedRun, Set<File> skippedFiles, int 
outputLevel) {
+            this.mergedRun = mergedRun;
+            this.skippedFiles = skippedFiles;
+            this.outputLevel = outputLevel;
+        }
+    }
+
+    /** A sorted run is a list of SST files whose key ranges do not overlap. */
+    static final class SortedRun {
+
+        final List<SstFileMetadata> files;
+
+        SortedRun(List<SstFileMetadata> files) {
+            this.files = files;
+        }
+
+        long totalSize() {
+            long size = 0;
+            for (SstFileMetadata meta : files) {
+                size += meta.getFileSize();
+            }
+            return size;
+        }
+    }
+
+    // 
-------------------------------------------------------------------------
+    //  Merge helpers
+    // 
-------------------------------------------------------------------------
+
+    /**
+     * Wraps an SST file iterator as a merge source, lazily reading blocks and 
entries. Each source
+     * has a sequence number to resolve key conflicts (higher sequence = newer 
data).
+     */
+    private static final class MergeSource {
+
+        private final SstFileReader.SstFileIterator fileIterator;
+        private final int sequence;
+        private BlockIterator currentBlock;
+        private MergeEntry current;
+
+        MergeSource(SstFileReader.SstFileIterator fileIterator, int sequence) 
throws IOException {
+            this.fileIterator = fileIterator;
+            this.sequence = sequence;
+            this.currentBlock = fileIterator.readBatch();
+        }
+
+        boolean advance() throws IOException {
+            while (true) {
+                if (currentBlock != null && currentBlock.hasNext()) {
+                    Map.Entry<MemorySlice, MemorySlice> entry = 
currentBlock.next();
+                    current =
+                            new MergeEntry(
+                                    entry.getKey(), 
entry.getValue().copyBytes(), sequence, this);
+                    return true;
+                }
+                currentBlock = fileIterator.readBatch();
+                if (currentBlock == null) {
+                    current = null;
+                    return false;
+                }
+            }
+        }
+
+        MergeEntry currentEntry() {
+            return current;
+        }
+    }
+
+    /** A single key-value entry from a merge source, used as a heap element. 
*/
+    private static final class MergeEntry {
+
+        final MemorySlice key;
+        final byte[] value;
+        final int sequence;
+        final MergeSource source;
+
+        MergeEntry(MemorySlice key, byte[] value, int sequence, MergeSource 
source) {
+            this.key = key;
+            this.value = value;
+            this.sequence = sequence;
+            this.source = source;
+        }
+    }
+
+    /** Functional interface for supplying new SST file paths. */
+    public interface FileSupplier {
+        File newSstFile();
+    }
+
+    /** Functional interface for deleting SST files, allowing callers to clean 
up resources. */
+    public interface FileDeleter {
+        void deleteFile(File file);
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/db/SimpleLsmKvDb.java
 
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/db/SimpleLsmKvDb.java
new file mode 100644
index 0000000000..96df7eef9a
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/db/SimpleLsmKvDb.java
@@ -0,0 +1,573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lookup.sort.db;
+
+import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.compression.CompressOptions;
+import org.apache.paimon.io.cache.CacheManager;
+import org.apache.paimon.lookup.sort.SortLookupStoreFactory;
+import org.apache.paimon.lookup.sort.SortLookupStoreReader;
+import org.apache.paimon.lookup.sort.SortLookupStoreWriter;
+import org.apache.paimon.memory.MemorySlice;
+import org.apache.paimon.options.MemorySize;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * A simple LSM-Tree based KV database built on top of {@link 
SortLookupStoreFactory}.
+ *
+ * <p>Architecture (Universal Compaction, inspired by RocksDB):
+ *
+ * <pre>
+ *     ┌──────────────────────────────────────────────┐
+ *     │            MemTable (SkipList)                │  ← Active writes
+ *     ├──────────────────────────────────────────────┤
+ *     │  Sorted Runs (newest → oldest):              │
+ *     │    [Run-0] [Run-1] [Run-2] ... [Run-N]       │  ← Each run is a 
sorted SST file set
+ *     └──────────────────────────────────────────────┘
+ * </pre>
+ *
+ * <p>Compaction is triggered when the number of sorted runs exceeds a 
threshold. Runs are selected
+ * for merging based on size ratios between adjacent runs, following RocksDB's 
Universal Compaction
+ * strategy.
+ *
+ * <p>Note: No WAL is implemented. Data in the MemTable will be lost on crash.
+ *
+ * <p>This class is <b>not</b> thread-safe. External synchronization is 
required if accessed from
+ * multiple threads.
+ */
+public class SimpleLsmKvDb implements Closeable {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(SimpleLsmKvDb.class);
+
+    /** Tombstone marker for deleted keys. */
+    private static final byte[] TOMBSTONE = new byte[0];
+
+    /** Maximum number of levels in the LSM tree. */
+    static final int MAX_LEVELS = 4;
+
+    /**
+     * Estimated per-entry memory overhead in the MemTable's TreeMap, beyond 
the raw key/value
+     * bytes. This accounts for:
+     *
+     * <ul>
+     *   <li>TreeMap.Entry node: ~64 bytes (object header + 
left/right/parent/key/value refs +
+     *       color)
+     *   <li>MemorySlice wrapper: ~32 bytes (object header + segment ref + 
offset + length)
+     *   <li>MemorySegment backing the key: ~48 bytes (object header + 
heapMemory/offHeapBuffer refs
+     *       + address + size)
+     *   <li>byte[] value array header: ~16 bytes (object header + length)
+     * </ul>
+     */
+    static final long PER_ENTRY_OVERHEAD = 160;
+
+    private final File dataDirectory;
+    private final SortLookupStoreFactory storeFactory;
+    private final Comparator<MemorySlice> keyComparator;
+    private final long memTableFlushThreshold;
+    private final LsmCompactor compactor;
+
+    /** Active MemTable: key -> value bytes (empty byte[] = tombstone). */
+    private TreeMap<MemorySlice, byte[]> memTable;
+
+    /** Estimated size of the current MemTable in bytes. */
+    private long memTableSize;
+
+    /**
+     * Multi-level SST file storage. Each level contains a list of {@link 
SstFileMetadata} ordered
+     * by key range. Level 0 files are ordered newest-first (key ranges may 
overlap). Level 1+ files
+     * are ordered by minKey (key ranges do NOT overlap).
+     */
+    private final List<List<SstFileMetadata>> levels;
+
+    /** Cached readers for SST files, keyed by file path. Lazily populated on 
first lookup. */
+    private final Map<File, SortLookupStoreReader> readerCache;
+
+    private long fileSequence;
+    private boolean closed;
+
+    private SimpleLsmKvDb(
+            File dataDirectory,
+            SortLookupStoreFactory storeFactory,
+            Comparator<MemorySlice> keyComparator,
+            long memTableFlushThreshold,
+            long maxSstFileSize,
+            int level0FileNumCompactTrigger,
+            int sizeRatio) {
+        this.dataDirectory = dataDirectory;
+        this.storeFactory = storeFactory;
+        this.keyComparator = keyComparator;
+        this.memTableFlushThreshold = memTableFlushThreshold;
+        this.memTable = new TreeMap<>(keyComparator);
+        this.memTableSize = 0;
+        this.levels = new ArrayList<>();
+        for (int i = 0; i < MAX_LEVELS; i++) {
+            this.levels.add(new ArrayList<>());
+        }
+        this.readerCache = new HashMap<>();
+        this.fileSequence = 0;
+        this.closed = false;
+        this.compactor =
+                new LsmCompactor(
+                        keyComparator,
+                        storeFactory,
+                        maxSstFileSize,
+                        level0FileNumCompactTrigger,
+                        sizeRatio,
+                        this::closeAndDeleteSstFile);
+    }
+
+    /**
+     * Close the cached reader for the given SST file (if any) and delete the 
file from disk. This
+     * is invoked by {@link LsmCompactor} via the {@link 
LsmCompactor.FileDeleter} callback during
+     * compaction.
+     */
+    private void closeAndDeleteSstFile(File file) {
+        SortLookupStoreReader reader = readerCache.remove(file);
+        if (reader != null) {
+            try {
+                reader.close();
+            } catch (IOException e) {
+                LOG.warn("Failed to close reader for SST file: {}", 
file.getName(), e);
+            }
+        }
+        if (file.exists()) {
+            boolean deleted = file.delete();
+            if (!deleted) {
+                LOG.warn("Failed to delete SST file: {}", file.getName());
+            }
+        }
+    }
+
+    // 
-------------------------------------------------------------------------
+    //  Builder
+    // 
-------------------------------------------------------------------------
+
+    /** Create a builder for {@link SimpleLsmKvDb}. */
+    public static Builder builder(File dataDirectory) {
+        return new Builder(dataDirectory);
+    }
+
+    // 
-------------------------------------------------------------------------
+    //  Write Operations
+    // 
-------------------------------------------------------------------------
+
+    /**
+     * Put a key-value pair into the database.
+     *
+     * @param key the key bytes, must not be null
+     * @param value the value bytes, must not be null
+     */
+    public void put(byte[] key, byte[] value) throws IOException {
+        ensureOpen();
+        if (value.length == 0) {
+            throw new IllegalArgumentException(
+                    "Value must not be an empty byte array, which is reserved 
as TOMBSTONE marker. "
+                            + "Use delete() to remove a key.");
+        }
+        MemorySlice wrappedKey = MemorySlice.wrap(key);
+        byte[] oldValue = memTable.put(wrappedKey, value);
+        long delta = key.length + value.length;
+        if (oldValue != null) {
+            delta -= (key.length + oldValue.length);
+        } else {
+            delta += PER_ENTRY_OVERHEAD;
+        }
+        memTableSize += delta;
+        maybeFlushMemTable();
+    }
+
+    /**
+     * Delete a key from the database by writing a tombstone.
+     *
+     * @param key the key bytes to delete
+     */
+    public void delete(byte[] key) throws IOException {
+        ensureOpen();
+        MemorySlice wrappedKey = MemorySlice.wrap(key);
+        byte[] oldValue = memTable.put(wrappedKey, TOMBSTONE);
+        long delta = key.length;
+        if (oldValue != null) {
+            delta -= (key.length + oldValue.length);
+        } else {
+            delta += PER_ENTRY_OVERHEAD;
+        }
+        memTableSize += delta;
+        maybeFlushMemTable();
+    }
+
+    // 
-------------------------------------------------------------------------
+    //  Read Operations
+    // 
-------------------------------------------------------------------------
+
+    /**
+     * Get the value associated with the given key.
+     *
+     * <p>Search order: MemTable → Level 0 (newest to oldest) → Level 1 → 
Level 2 → ...
+     *
+     * @param key the key bytes
+     * @return the value bytes, or null if the key does not exist or has been 
deleted
+     */
+    @Nullable
+    public byte[] get(byte[] key) throws IOException {
+        ensureOpen();
+
+        // 1. Search MemTable first (newest data)
+        MemorySlice wrappedKey = MemorySlice.wrap(key);
+        byte[] memValue = memTable.get(wrappedKey);
+        if (memValue != null) {
+            return isTombstone(memValue) ? null : memValue;
+        }
+
+        // 2. Search each level from L0 to Lmax
+        for (int level = 0; level < MAX_LEVELS; level++) {
+            List<SstFileMetadata> levelFiles = levels.get(level);
+            if (levelFiles.isEmpty()) {
+                continue;
+            }
+
+            if (level == 0) {
+                // L0: files may have overlapping keys, search newest-first
+                for (SstFileMetadata meta : levelFiles) {
+                    if (!meta.mightContainKey(wrappedKey, keyComparator)) {
+                        continue;
+                    }
+                    byte[] value = lookupInFile(meta.getFile(), key);
+                    if (value != null) {
+                        return isTombstone(value) ? null : value;
+                    }
+                }
+            } else {
+                // L1+: files have non-overlapping key ranges, binary search
+                SstFileMetadata target = findFileForKey(levelFiles, 
wrappedKey);
+                if (target != null) {
+                    byte[] value = lookupInFile(target.getFile(), key);
+                    if (value != null) {
+                        return isTombstone(value) ? null : value;
+                    }
+                }
+            }
+        }
+
+        return null;
+    }
+
+    // 
-------------------------------------------------------------------------
+    //  Flush & Compaction
+    // 
-------------------------------------------------------------------------
+
+    /** Force flush the current MemTable to a Level 0 SST file. */
+    public void flush() throws IOException {
+        ensureOpen();
+        if (memTable.isEmpty()) {
+            return;
+        }
+
+        TreeMap<MemorySlice, byte[]> snapshot = memTable;
+        memTable = new TreeMap<>(keyComparator);
+        memTableSize = 0;
+
+        SstFileMetadata metadata = writeMemTableToSst(snapshot);
+
+        levels.get(0).add(0, metadata);
+
+        LOG.info(
+                "Flushed MemTable to L0 SST file: {}, entries: {}",
+                metadata.getFile().getName(),
+                snapshot.size());
+
+        compactor.maybeCompact(levels, MAX_LEVELS, this::newSstFile);
+    }
+
+    /**
+     * Force a full compaction of all levels into the deepest level. This 
merges all data and cleans
+     * up tombstones (which are only removed at the max level), reducing the 
total number of SST
+     * files to the minimum.
+     */
+    public void compact() throws IOException {
+        ensureOpen();
+        compactor.fullCompact(levels, MAX_LEVELS, this::newSstFile);
+    }
+
+    // 
-------------------------------------------------------------------------
+    //  Lifecycle
+    // 
-------------------------------------------------------------------------
+
+    @Override
+    public void close() throws IOException {
+        if (closed) {
+            return;
+        }
+        closed = true;
+
+        // Flush remaining MemTable data to L0
+        if (!memTable.isEmpty()) {
+            TreeMap<MemorySlice, byte[]> snapshot = memTable;
+            memTable = new TreeMap<>(keyComparator);
+            memTableSize = 0;
+
+            SstFileMetadata metadata = writeMemTableToSst(snapshot);
+            levels.get(0).add(0, metadata);
+        }
+
+        // Close all cached readers
+        for (SortLookupStoreReader reader : readerCache.values()) {
+            try {
+                reader.close();
+            } catch (IOException e) {
+                LOG.warn("Failed to close cached reader during shutdown", e);
+            }
+        }
+        readerCache.clear();
+
+        LOG.info("SimpleLsmKvDb closed. Level stats: {}", getLevelStats());
+    }
+
+    /** Return the total number of SST files across all levels. */
+    @VisibleForTesting
+    int getSstFileCount() {
+        int count = 0;
+        for (List<SstFileMetadata> levelFiles : levels) {
+            count += levelFiles.size();
+        }
+        return count;
+    }
+
+    /** Return the number of SST files at a specific level. */
+    public int getLevelFileCount(int level) {
+        if (level < 0 || level >= MAX_LEVELS) {
+            return 0;
+        }
+        return levels.get(level).size();
+    }
+
+    /** Return the estimated MemTable size in bytes. */
+    public long getMemTableSize() {
+        return memTableSize;
+    }
+
+    /** Return a human-readable summary of file counts per level. */
+    public String getLevelStats() {
+        StringBuilder sb = new StringBuilder();
+        for (int i = 0; i < MAX_LEVELS; i++) {
+            int count = levels.get(i).size();
+            if (count > 0) {
+                if (sb.length() > 0) {
+                    sb.append(", ");
+                }
+                sb.append("L").append(i).append("=").append(count);
+            }
+        }
+        return sb.length() == 0 ? "empty" : sb.toString();
+    }
+
+    // 
-------------------------------------------------------------------------
+    //  Internal Helpers
+    // 
-------------------------------------------------------------------------
+
+    private void maybeFlushMemTable() throws IOException {
+        if (memTableSize >= memTableFlushThreshold) {
+            flush();
+        }
+    }
+
+    @Nullable
+    private byte[] lookupInFile(File file, byte[] key) throws IOException {
+        SortLookupStoreReader reader = readerCache.get(file);
+        if (reader == null) {
+            reader = storeFactory.createReader(file);
+            readerCache.put(file, reader);
+        }
+        return reader.lookup(key);
+    }
+
+    @Nullable
+    private SstFileMetadata findFileForKey(List<SstFileMetadata> sortedFiles, 
MemorySlice key) {
+        int low = 0;
+        int high = sortedFiles.size() - 1;
+        while (low <= high) {
+            int mid = low + (high - low) / 2;
+            SstFileMetadata midFile = sortedFiles.get(mid);
+            if (keyComparator.compare(key, midFile.getMinKey()) < 0) {
+                high = mid - 1;
+            } else if (keyComparator.compare(key, midFile.getMaxKey()) > 0) {
+                low = mid + 1;
+            } else {
+                return midFile;
+            }
+        }
+        return null;
+    }
+
+    private SstFileMetadata writeMemTableToSst(TreeMap<MemorySlice, byte[]> 
data)
+            throws IOException {
+        File sstFile = newSstFile();
+        SortLookupStoreWriter writer = storeFactory.createWriter(sstFile, 
null);
+        MemorySlice minKey = null;
+        MemorySlice maxKey = null;
+        long tombstoneCount = 0;
+        try {
+            for (Map.Entry<MemorySlice, byte[]> entry : data.entrySet()) {
+                writer.put(entry.getKey().copyBytes(), entry.getValue());
+                if (minKey == null) {
+                    minKey = entry.getKey();
+                }
+                maxKey = entry.getKey();
+                if (isTombstone(entry.getValue())) {
+                    tombstoneCount++;
+                }
+            }
+        } finally {
+            writer.close();
+        }
+        return new SstFileMetadata(sstFile, minKey, maxKey, tombstoneCount, 0);
+    }
+
+    private File newSstFile() {
+        long sequence = fileSequence++;
+        return new File(dataDirectory, String.format("sst-%06d.db", sequence));
+    }
+
+    private void ensureOpen() {
+        if (closed) {
+            throw new IllegalStateException("SimpleLsmKvDb is already closed");
+        }
+    }
+
+    // 
-------------------------------------------------------------------------
+    //  Builder
+    // 
-------------------------------------------------------------------------
+
+    /** Builder for {@link SimpleLsmKvDb}. */
+    public static class Builder {
+
+        private final File dataDirectory;
+        private long memTableFlushThreshold = 64 * 1024 * 1024; // 64 MB
+        private long maxSstFileSize = 8 * 1024 * 1024; // 8 MB
+        private int blockSize = 32 * 1024; // 32 KB
+        private long cacheSize = 128 * 1024 * 1024; // 128 MB
+        private int level0FileNumCompactTrigger = 4;
+        private int sizeRatio = 10;
+        private CompressOptions compressOptions = 
CompressOptions.defaultOptions();
+        private Comparator<MemorySlice> keyComparator = MemorySlice::compareTo;
+
+        Builder(File dataDirectory) {
+            this.dataDirectory = dataDirectory;
+        }
+
+        /** Set the MemTable flush threshold in bytes. Default is 64 MB. */
+        public Builder memTableFlushThreshold(long thresholdBytes) {
+            this.memTableFlushThreshold = thresholdBytes;
+            return this;
+        }
+
+        /** Set the maximum SST file size produced by compaction in bytes. 
Default is 8 MB. */
+        public Builder maxSstFileSize(long maxSstFileSize) {
+            this.maxSstFileSize = maxSstFileSize;
+            return this;
+        }
+
+        /** Set the SST block size in bytes. Default is 32 KB. */
+        public Builder blockSize(int blockSize) {
+            this.blockSize = blockSize;
+            return this;
+        }
+
+        /** Set the block cache size in bytes. Default is 128 MB. */
+        public Builder cacheSize(long cacheSize) {
+            this.cacheSize = cacheSize;
+            return this;
+        }
+
+        /** Set the level 0 file number that triggers compaction. Default is 
4. */
+        public Builder level0FileNumCompactTrigger(int fileNum) {
+            this.level0FileNumCompactTrigger = fileNum;
+            return this;
+        }
+
+        /**
+         * Set the size ratio percentage for Universal Compaction. When the 
accumulated size of
+         * newer runs divided by the next run's size is less than this 
percentage, the runs are
+         * merged together. Default is 10 (meaning 10%).
+         */
+        public Builder sizeRatio(int sizeRatio) {
+            this.sizeRatio = sizeRatio;
+            return this;
+        }
+
+        /** Set compression options. Default is zstd level 1. */
+        public Builder compressOptions(CompressOptions compressOptions) {
+            this.compressOptions = compressOptions;
+            return this;
+        }
+
+        /**
+         * Set a custom key comparator. Default is unsigned lexicographic byte 
comparison.
+         *
+         * <p>The comparator must be consistent with the {@link 
SortLookupStoreFactory}'s comparator
+         * so that SST file lookups return correct results.
+         */
+        public Builder keyComparator(Comparator<MemorySlice> keyComparator) {
+            this.keyComparator = keyComparator;
+            return this;
+        }
+
+        /** Build the {@link SimpleLsmKvDb} instance. */
+        public SimpleLsmKvDb build() {
+            if (!dataDirectory.exists()) {
+                boolean created = dataDirectory.mkdirs();
+                if (!created) {
+                    throw new IllegalStateException(
+                            "Failed to create data directory: " + 
dataDirectory);
+                }
+            }
+
+            CacheManager cacheManager = new 
CacheManager(MemorySize.ofBytes(cacheSize));
+            SortLookupStoreFactory factory =
+                    new SortLookupStoreFactory(
+                            keyComparator, cacheManager, blockSize, 
compressOptions);
+
+            return new SimpleLsmKvDb(
+                    dataDirectory,
+                    factory,
+                    keyComparator,
+                    memTableFlushThreshold,
+                    maxSstFileSize,
+                    level0FileNumCompactTrigger,
+                    sizeRatio);
+        }
+    }
+
+    static boolean isTombstone(byte[] value) {
+        return value.length == 0;
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/db/SstFileMetadata.java
 
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/db/SstFileMetadata.java
new file mode 100644
index 0000000000..48e78d426c
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/db/SstFileMetadata.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lookup.sort.db;
+
+import org.apache.paimon.memory.MemorySlice;
+
+import java.io.File;
+import java.util.Comparator;
+
+/**
+ * Metadata for an SST file, tracking its key range, size and level for 
efficient compaction and
+ * lookup.
+ */
+public final class SstFileMetadata {
+
+    private final File file;
+    private final MemorySlice minKey;
+    private final MemorySlice maxKey;
+    private final long fileSize;
+    private final long tombstoneCount;
+    private final int level;
+
+    public SstFileMetadata(
+            File file, MemorySlice minKey, MemorySlice maxKey, long 
tombstoneCount, int level) {
+        this(file, minKey, maxKey, file.length(), tombstoneCount, level);
+    }
+
+    public SstFileMetadata(
+            File file,
+            MemorySlice minKey,
+            MemorySlice maxKey,
+            long fileSize,
+            long tombstoneCount,
+            int level) {
+        this.file = file;
+        this.minKey = minKey;
+        this.maxKey = maxKey;
+        this.fileSize = fileSize;
+        this.tombstoneCount = tombstoneCount;
+        this.level = level;
+    }
+
+    public File getFile() {
+        return file;
+    }
+
+    public MemorySlice getMinKey() {
+        return minKey;
+    }
+
+    public MemorySlice getMaxKey() {
+        return maxKey;
+    }
+
+    public long getFileSize() {
+        return fileSize;
+    }
+
+    public int getLevel() {
+        return level;
+    }
+
+    public SstFileMetadata withLevel(int newLevel) {
+        return new SstFileMetadata(file, minKey, maxKey, fileSize, 
tombstoneCount, newLevel);
+    }
+
+    public boolean hasTombstones() {
+        return tombstoneCount > 0;
+    }
+
+    public boolean mightContainKey(MemorySlice key, Comparator<MemorySlice> 
comparator) {
+        return comparator.compare(key, minKey) >= 0 && comparator.compare(key, 
maxKey) <= 0;
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/sst/SstFileWriter.java 
b/paimon-common/src/main/java/org/apache/paimon/sst/SstFileWriter.java
index 613ffd80de..9fecb05ad1 100644
--- a/paimon-common/src/main/java/org/apache/paimon/sst/SstFileWriter.java
+++ b/paimon-common/src/main/java/org/apache/paimon/sst/SstFileWriter.java
@@ -182,9 +182,11 @@ public class SstFileWriter {
     @Nullable
     public BlockHandle writeIndexBlock() throws IOException {
         BlockHandle indexBlock = writeBlock(indexBlockWriter);
-        LOG.info("Number of record: {}", recordCount);
-        LOG.info("totalUncompressedSize: {}", 
MemorySize.ofBytes(totalUncompressedSize));
-        LOG.info("totalCompressedSize: {}", 
MemorySize.ofBytes(totalCompressedSize));
+        LOG.info(
+                "SST written: records={}, uncompressed={}, compressed={}",
+                recordCount,
+                MemorySize.ofBytes(totalUncompressedSize),
+                MemorySize.ofBytes(totalCompressedSize));
         return indexBlock;
     }
 
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/lookup/sort/db/SimpleLsmKvDbTest.java
 
b/paimon-common/src/test/java/org/apache/paimon/lookup/sort/db/SimpleLsmKvDbTest.java
new file mode 100644
index 0000000000..e3b67fd3cd
--- /dev/null
+++ 
b/paimon-common/src/test/java/org/apache/paimon/lookup/sort/db/SimpleLsmKvDbTest.java
@@ -0,0 +1,1363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lookup.sort.db;
+
+import org.apache.paimon.compression.CompressOptions;
+import org.apache.paimon.memory.MemorySlice;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Comparator;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+/** Test for {@link SimpleLsmKvDb}. */
+public class SimpleLsmKvDbTest {
+
+    @TempDir java.nio.file.Path tempDir;
+
+    private File dataDirectory;
+
+    @BeforeEach
+    public void setUp() {
+        dataDirectory = new File(tempDir.toFile(), "test-db");
+    }
+
+    private SimpleLsmKvDb createDb() {
+        return SimpleLsmKvDb.builder(dataDirectory)
+                .memTableFlushThreshold(1024)
+                .blockSize(256)
+                .cacheSize(4 * 1024 * 1024)
+                .level0FileNumCompactTrigger(4)
+                .compressOptions(new CompressOptions("none", 1))
+                .build();
+    }
+
+    @Test
+    public void testBasicPutAndGet() throws IOException {
+        try (SimpleLsmKvDb db = createDb()) {
+            putString(db, "key1", "value1");
+            putString(db, "key2", "value2");
+            putString(db, "key3", "value3");
+
+            Assertions.assertEquals("value1", getString(db, "key1"));
+            Assertions.assertEquals("value2", getString(db, "key2"));
+            Assertions.assertEquals("value3", getString(db, "key3"));
+            Assertions.assertNull(getString(db, "nonexistent"));
+        }
+    }
+
+    @Test
+    public void testOverwriteValue() throws IOException {
+        try (SimpleLsmKvDb db = createDb()) {
+            putString(db, "key1", "value1");
+            Assertions.assertEquals("value1", getString(db, "key1"));
+
+            putString(db, "key1", "value2");
+            Assertions.assertEquals("value2", getString(db, "key1"));
+
+            putString(db, "key1", "value3");
+            Assertions.assertEquals("value3", getString(db, "key1"));
+        }
+    }
+
+    @Test
+    public void testDelete() throws IOException {
+        try (SimpleLsmKvDb db = createDb()) {
+            putString(db, "key1", "value1");
+            putString(db, "key2", "value2");
+
+            Assertions.assertEquals("value1", getString(db, "key1"));
+
+            deleteString(db, "key1");
+            Assertions.assertNull(getString(db, "key1"));
+
+            // key2 should still exist
+            Assertions.assertEquals("value2", getString(db, "key2"));
+
+            // deleting a non-existent key should not cause errors
+            deleteString(db, "nonexistent");
+            Assertions.assertNull(getString(db, "nonexistent"));
+        }
+    }
+
+    @Test
+    public void testDeleteAndReinsert() throws IOException {
+        try (SimpleLsmKvDb db = createDb()) {
+            putString(db, "key1", "value1");
+            Assertions.assertEquals("value1", getString(db, "key1"));
+
+            deleteString(db, "key1");
+            Assertions.assertNull(getString(db, "key1"));
+
+            putString(db, "key1", "value2");
+            Assertions.assertEquals("value2", getString(db, "key1"));
+        }
+    }
+
+    @Test
+    public void testFlushToSst() throws IOException {
+        try (SimpleLsmKvDb db = createDb()) {
+            Assertions.assertEquals(0, db.getSstFileCount());
+
+            // Write enough data to trigger a flush (threshold is 1024 bytes)
+            for (int i = 0; i < 100; i++) {
+                putString(db, String.format("key-%05d", i), 
String.format("value-%05d", i));
+            }
+
+            // After writing enough data, at least one SST file should exist
+            Assertions.assertTrue(
+                    db.getSstFileCount() > 0,
+                    "Expected at least one SST file after writing enough 
data");
+
+            // All data should still be readable
+            for (int i = 0; i < 100; i++) {
+                String expected = String.format("value-%05d", i);
+                String actual = getString(db, String.format("key-%05d", i));
+                Assertions.assertEquals(expected, actual, "Mismatch for key-" 
+ i);
+            }
+        }
+    }
+
+    @Test
+    public void testReadFromSstAfterFlush() throws IOException {
+        try (SimpleLsmKvDb db = createDb()) {
+            putString(db, "alpha", "first");
+            putString(db, "beta", "second");
+            putString(db, "gamma", "third");
+
+            // Force flush
+            db.flush();
+            Assertions.assertTrue(db.getSstFileCount() > 0);
+
+            // Data should be readable from SST
+            Assertions.assertEquals("first", getString(db, "alpha"));
+            Assertions.assertEquals("second", getString(db, "beta"));
+            Assertions.assertEquals("third", getString(db, "gamma"));
+        }
+    }
+
+    @Test
+    public void testOverwriteAcrossFlush() throws IOException {
+        try (SimpleLsmKvDb db = createDb()) {
+            putString(db, "key1", "old-value");
+            db.flush();
+
+            // Overwrite in MemTable (newer than SST)
+            putString(db, "key1", "new-value");
+            Assertions.assertEquals("new-value", getString(db, "key1"));
+
+            // After another flush, the new value should persist
+            db.flush();
+            Assertions.assertEquals("new-value", getString(db, "key1"));
+        }
+    }
+
+    @Test
+    public void testDeleteAcrossFlush() throws IOException {
+        try (SimpleLsmKvDb db = createDb()) {
+            putString(db, "key1", "value1");
+            db.flush();
+
+            // Delete in MemTable
+            deleteString(db, "key1");
+            Assertions.assertNull(getString(db, "key1"));
+
+            // After flush, the tombstone should be in SST and key should 
still be deleted
+            db.flush();
+            Assertions.assertNull(getString(db, "key1"));
+        }
+    }
+
+    @Test
+    public void testCompaction() throws IOException {
+        // Use a low compaction threshold to trigger compaction easily
+        SimpleLsmKvDb db =
+                SimpleLsmKvDb.builder(dataDirectory)
+                        .memTableFlushThreshold(256)
+                        .blockSize(128)
+                        .cacheSize(4 * 1024 * 1024)
+                        .level0FileNumCompactTrigger(3)
+                        .compressOptions(new CompressOptions("none", 1))
+                        .build();
+
+        try {
+            // Write data in batches to create multiple SST files
+            for (int batch = 0; batch < 5; batch++) {
+                for (int i = 0; i < 20; i++) {
+                    int key = batch * 20 + i;
+                    putString(
+                            db,
+                            String.format("key-%05d", key),
+                            String.format("value-%05d-batch-%d", key, batch));
+                }
+                db.flush();
+            }
+
+            // Compaction may have been triggered automatically
+            // Verify all data is still accessible
+            for (int batch = 0; batch < 5; batch++) {
+                for (int i = 0; i < 20; i++) {
+                    int key = batch * 20 + i;
+                    String value = getString(db, String.format("key-%05d", 
key));
+                    Assertions.assertNotNull(
+                            value, "Key key-" + key + " should exist after 
compaction");
+                }
+            }
+        } finally {
+            db.close();
+        }
+    }
+
+    @Test
+    public void testCompactionRemovesTombstones() throws IOException {
+        try (SimpleLsmKvDb db = createDb()) {
+            putString(db, "key1", "value1");
+            putString(db, "key2", "value2");
+            putString(db, "key3", "value3");
+            db.flush();
+
+            deleteString(db, "key2");
+            db.flush();
+
+            // Before compaction, key2 should be deleted
+            Assertions.assertNull(getString(db, "key2"));
+
+            // Force compaction to push tombstones to max level where they are 
removed
+            db.compact();
+
+            // After full compaction, key2 should still be deleted
+            Assertions.assertNull(getString(db, "key2"));
+            Assertions.assertEquals("value1", getString(db, "key1"));
+            Assertions.assertEquals("value3", getString(db, "key3"));
+        }
+    }
+
+    @Test
+    public void testManualCompaction() throws IOException {
+        try (SimpleLsmKvDb db = createDb()) {
+            // Create multiple SST files in L0
+            putString(db, "a", "1");
+            db.flush();
+            putString(db, "b", "2");
+            db.flush();
+            putString(db, "c", "3");
+            db.flush();
+
+            Assertions.assertEquals(3, db.getSstFileCount());
+            Assertions.assertEquals(3, db.getLevelFileCount(0));
+
+            db.compact();
+
+            // After full compaction, all data should be in the deepest level
+            Assertions.assertEquals(0, db.getLevelFileCount(0));
+            Assertions.assertTrue(db.getSstFileCount() > 0);
+
+            // All data should still be accessible
+            Assertions.assertEquals("1", getString(db, "a"));
+            Assertions.assertEquals("2", getString(db, "b"));
+            Assertions.assertEquals("3", getString(db, "c"));
+        }
+    }
+
+    @Test
+    public void testLargeDataSet() throws IOException {
+        try (SimpleLsmKvDb db = createDb()) {
+            int recordCount = 1000;
+            for (int i = 0; i < recordCount; i++) {
+                putString(db, String.format("key-%08d", i), 
String.format("value-%08d", i));
+            }
+
+            // Verify all records
+            for (int i = 0; i < recordCount; i++) {
+                String expected = String.format("value-%08d", i);
+                String actual = getString(db, String.format("key-%08d", i));
+                Assertions.assertEquals(expected, actual, "Mismatch at index " 
+ i);
+            }
+        }
+    }
+
+    @Test
+    public void testByteArrayKeyValue() throws IOException {
+        try (SimpleLsmKvDb db = createDb()) {
+            byte[] key = new byte[] {0x01, 0x02, 0x03};
+            byte[] value = new byte[] {0x0A, 0x0B, 0x0C, 0x0D};
+
+            db.put(key, value);
+
+            byte[] result = db.get(key);
+            Assertions.assertNotNull(result);
+            Assertions.assertArrayEquals(value, result);
+        }
+    }
+
+    @Test
+    public void testCloseFlushesMemTable() throws IOException {
+        File dbDir = new File(tempDir.toFile(), "close-test-db");
+        // Write data and close
+        SimpleLsmKvDb db =
+                SimpleLsmKvDb.builder(dbDir)
+                        .memTableFlushThreshold(1024 * 1024) // large 
threshold, won't auto-flush
+                        .blockSize(256)
+                        .cacheSize(4 * 1024 * 1024)
+                        .level0FileNumCompactTrigger(10)
+                        .compressOptions(new CompressOptions("none", 1))
+                        .build();
+
+        putString(db, "persist-key", "persist-value");
+        Assertions.assertEquals(0, db.getSstFileCount());
+        db.close();
+
+        // After close, data should have been flushed to SST
+        Assertions.assertEquals(1, db.getSstFileCount());
+    }
+
+    @Test
+    public void testClosedDbThrowsException() throws IOException {
+        SimpleLsmKvDb db = createDb();
+        db.close();
+
+        Assertions.assertThrows(IllegalStateException.class, () -> 
putString(db, "key", "value"));
+        Assertions.assertThrows(IllegalStateException.class, () -> 
getString(db, "key"));
+        Assertions.assertThrows(IllegalStateException.class, () -> 
deleteString(db, "key"));
+    }
+
+    @Test
+    public void testWithCompression() throws IOException {
+        SimpleLsmKvDb db =
+                SimpleLsmKvDb.builder(new File(tempDir.toFile(), 
"compressed-db"))
+                        .memTableFlushThreshold(512)
+                        .blockSize(256)
+                        .cacheSize(4 * 1024 * 1024)
+                        .level0FileNumCompactTrigger(4)
+                        .compressOptions(CompressOptions.defaultOptions())
+                        .build();
+
+        try {
+            for (int i = 0; i < 200; i++) {
+                putString(
+                        db,
+                        String.format("compressed-key-%05d", i),
+                        String.format("compressed-value-%05d", i));
+            }
+
+            for (int i = 0; i < 200; i++) {
+                String expected = String.format("compressed-value-%05d", i);
+                String actual = getString(db, 
String.format("compressed-key-%05d", i));
+                Assertions.assertEquals(expected, actual);
+            }
+        } finally {
+            db.close();
+        }
+    }
+
+    @Test
+    public void testEmptyDb() throws IOException {
+        try (SimpleLsmKvDb db = createDb()) {
+            Assertions.assertNull(getString(db, "any-key"));
+            Assertions.assertEquals(0, db.getSstFileCount());
+            Assertions.assertEquals(0, db.getMemTableSize());
+        }
+    }
+
+    @Test
+    public void testFlushEmptyMemTable() throws IOException {
+        try (SimpleLsmKvDb db = createDb()) {
+            // Flushing an empty MemTable should be a no-op
+            db.flush();
+            Assertions.assertEquals(0, db.getSstFileCount());
+        }
+    }
+
+    @Test
+    public void testUniversalCompactionTriggeredByRunCount() throws 
IOException {
+        // Compaction threshold = 3, so 3 sorted runs trigger compaction
+        SimpleLsmKvDb db =
+                SimpleLsmKvDb.builder(new File(tempDir.toFile(), 
"universal-trigger-db"))
+                        .memTableFlushThreshold(1024 * 1024)
+                        .blockSize(256)
+                        .cacheSize(4 * 1024 * 1024)
+                        .level0FileNumCompactTrigger(3)
+                        .compressOptions(new CompressOptions("none", 1))
+                        .build();
+
+        try {
+            // Create 3 sorted runs (L0 files) to trigger universal compaction
+            putString(db, "aaa", "v1");
+            db.flush();
+            Assertions.assertEquals(1, db.getLevelFileCount(0));
+
+            putString(db, "bbb", "v2");
+            db.flush();
+            Assertions.assertEquals(2, db.getLevelFileCount(0));
+
+            putString(db, "ccc", "v3");
+            db.flush();
+            // After 3rd flush, universal compaction should have been triggered
+            // L0 should be cleared and data moved to a deeper level
+            Assertions.assertEquals(0, db.getLevelFileCount(0));
+            Assertions.assertTrue(db.getSstFileCount() > 0);
+
+            // All data should be accessible
+            Assertions.assertEquals("v1", getString(db, "aaa"));
+            Assertions.assertEquals("v2", getString(db, "bbb"));
+            Assertions.assertEquals("v3", getString(db, "ccc"));
+        } finally {
+            db.close();
+        }
+    }
+
+    @Test
+    public void testUniversalCompactionWithOverlappingKeys() throws 
IOException {
+        SimpleLsmKvDb db =
+                SimpleLsmKvDb.builder(new File(tempDir.toFile(), 
"universal-overlap-db"))
+                        .memTableFlushThreshold(1024 * 1024)
+                        .blockSize(256)
+                        .cacheSize(4 * 1024 * 1024)
+                        .level0FileNumCompactTrigger(3)
+                        .compressOptions(new CompressOptions("none", 1))
+                        .build();
+
+        try {
+            // Write overlapping keys across multiple flushes
+            putString(db, "key-a", "old-a");
+            putString(db, "key-b", "old-b");
+            db.flush();
+
+            putString(db, "key-a", "new-a");
+            putString(db, "key-c", "new-c");
+            db.flush();
+
+            putString(db, "key-b", "new-b");
+            putString(db, "key-d", "new-d");
+            db.flush();
+            // Universal compaction should have been triggered
+
+            // Newer values should win
+            Assertions.assertEquals("new-a", getString(db, "key-a"));
+            Assertions.assertEquals("new-b", getString(db, "key-b"));
+            Assertions.assertEquals("new-c", getString(db, "key-c"));
+            Assertions.assertEquals("new-d", getString(db, "key-d"));
+        } finally {
+            db.close();
+        }
+    }
+
+    @Test
+    public void testUniversalCompactionReducesFileCount() throws IOException {
+        SimpleLsmKvDb db =
+                SimpleLsmKvDb.builder(new File(tempDir.toFile(), 
"universal-reduce-db"))
+                        .memTableFlushThreshold(1024 * 1024)
+                        .blockSize(256)
+                        .cacheSize(4 * 1024 * 1024)
+                        .level0FileNumCompactTrigger(3)
+                        .compressOptions(new CompressOptions("none", 1))
+                        .build();
+
+        try {
+            // Create 2 runs with overlapping keys (below threshold, no 
compaction)
+            putString(db, "shared-key", "v1");
+            putString(db, "a", "1");
+            db.flush();
+            putString(db, "shared-key", "v2");
+            putString(db, "b", "2");
+            db.flush();
+            int fileCountBeforeCompaction = db.getSstFileCount();
+            Assertions.assertEquals(2, fileCountBeforeCompaction);
+
+            // 3rd flush triggers compaction; overlapping files get merged, 
reducing count
+            putString(db, "shared-key", "v3");
+            putString(db, "c", "3");
+            db.flush();
+            int fileCountAfterCompaction = db.getSstFileCount();
+            Assertions.assertTrue(
+                    fileCountAfterCompaction <= fileCountBeforeCompaction,
+                    "Compaction should reduce or maintain file count, but got "
+                            + fileCountAfterCompaction);
+
+            // Data integrity — newest value wins
+            Assertions.assertEquals("v3", getString(db, "shared-key"));
+            Assertions.assertEquals("1", getString(db, "a"));
+            Assertions.assertEquals("2", getString(db, "b"));
+            Assertions.assertEquals("3", getString(db, "c"));
+        } finally {
+            db.close();
+        }
+    }
+
+    @Test
+    public void testUniversalCompactionMultipleRounds() throws IOException {
+        // Low threshold to trigger compaction frequently
+        SimpleLsmKvDb db =
+                SimpleLsmKvDb.builder(new File(tempDir.toFile(), 
"universal-multi-db"))
+                        .memTableFlushThreshold(512)
+                        .blockSize(128)
+                        .cacheSize(4 * 1024 * 1024)
+                        .level0FileNumCompactTrigger(3)
+                        .sizeRatio(50)
+                        .compressOptions(new CompressOptions("none", 1))
+                        .build();
+
+        try {
+            // Write many batches to trigger multiple rounds of compaction
+            int totalKeys = 500;
+            for (int i = 0; i < totalKeys; i++) {
+                putString(db, String.format("key-%06d", i), 
String.format("value-%06d", i));
+            }
+
+            // Verify all data is still correct after multiple compaction 
rounds
+            for (int i = 0; i < totalKeys; i++) {
+                String expected = String.format("value-%06d", i);
+                String actual = getString(db, String.format("key-%06d", i));
+                Assertions.assertEquals(
+                        expected, actual, "Mismatch for key-" + 
String.format("%06d", i));
+            }
+        } finally {
+            db.close();
+        }
+    }
+
+    @Test
+    public void testUniversalCompactionPreservesTombstonesInPartialMerge() 
throws IOException {
+        // Use a high compaction threshold so we can control when compaction 
happens
+        SimpleLsmKvDb db =
+                SimpleLsmKvDb.builder(new File(tempDir.toFile(), 
"universal-tombstone-db"))
+                        .memTableFlushThreshold(1024 * 1024)
+                        .blockSize(256)
+                        .cacheSize(4 * 1024 * 1024)
+                        .level0FileNumCompactTrigger(4)
+                        .compressOptions(new CompressOptions("none", 1))
+                        .build();
+
+        try {
+            // Create data and tombstones across multiple flushes
+            putString(db, "key-1", "value-1");
+            putString(db, "key-2", "value-2");
+            putString(db, "key-3", "value-3");
+            db.flush();
+
+            deleteString(db, "key-2");
+            db.flush();
+
+            putString(db, "key-4", "value-4");
+            db.flush();
+
+            // key-2 should be deleted (tombstone exists in SST)
+            Assertions.assertNull(getString(db, "key-2"));
+
+            // Now trigger compaction by adding one more flush
+            putString(db, "key-5", "value-5");
+            db.flush();
+            // 4 sorted runs should trigger compaction
+
+            // After compaction, deleted key should still be gone
+            Assertions.assertNull(getString(db, "key-2"));
+            Assertions.assertEquals("value-1", getString(db, "key-1"));
+            Assertions.assertEquals("value-3", getString(db, "key-3"));
+            Assertions.assertEquals("value-4", getString(db, "key-4"));
+            Assertions.assertEquals("value-5", getString(db, "key-5"));
+
+            // Full compaction should clean up tombstones completely
+            db.compact();
+            Assertions.assertNull(getString(db, "key-2"));
+            Assertions.assertEquals("value-1", getString(db, "key-1"));
+        } finally {
+            db.close();
+        }
+    }
+
+    @Test
+    public void testUniversalCompactionWithUpdatesAcrossRuns() throws 
IOException {
+        SimpleLsmKvDb db =
+                SimpleLsmKvDb.builder(new File(tempDir.toFile(), 
"universal-update-db"))
+                        .memTableFlushThreshold(1024 * 1024)
+                        .blockSize(256)
+                        .cacheSize(4 * 1024 * 1024)
+                        .level0FileNumCompactTrigger(3)
+                        .compressOptions(new CompressOptions("none", 1))
+                        .build();
+
+        try {
+            // Write same key across multiple sorted runs with different values
+            putString(db, "shared-key", "version-1");
+            db.flush();
+
+            putString(db, "shared-key", "version-2");
+            db.flush();
+
+            putString(db, "shared-key", "version-3");
+            db.flush();
+            // Compaction triggered
+
+            // The newest value should always win
+            Assertions.assertEquals("version-3", getString(db, "shared-key"));
+
+            // Write more updates and compact again
+            putString(db, "shared-key", "version-4");
+            db.flush();
+            putString(db, "shared-key", "version-5");
+            db.flush();
+            putString(db, "shared-key", "version-6");
+            db.flush();
+
+            Assertions.assertEquals("version-6", getString(db, "shared-key"));
+
+            // Full compaction should still preserve the latest value
+            db.compact();
+            Assertions.assertEquals("version-6", getString(db, "shared-key"));
+        } finally {
+            db.close();
+        }
+    }
+
+    @Test
+    public void testLargeScaleFlushCompactAndFullCompact() throws IOException {
+        // Very small thresholds to trigger flush, auto-compaction, and full 
compaction frequently
+        SimpleLsmKvDb db =
+                SimpleLsmKvDb.builder(new File(tempDir.toFile(), 
"large-scale-db"))
+                        .memTableFlushThreshold(256)
+                        .blockSize(64)
+                        .cacheSize(4 * 1024 * 1024)
+                        .level0FileNumCompactTrigger(3)
+                        .sizeRatio(20)
+                        .compressOptions(new CompressOptions("none", 1))
+                        .build();
+
+        try {
+            int totalKeys = 2000;
+
+            // Phase 1: Insert keys, triggering many flushes and 
auto-compactions
+            for (int i = 0; i < totalKeys; i++) {
+                putString(db, String.format("key-%08d", i), 
String.format("value-%08d", i));
+            }
+
+            // Verify all keys are readable after many auto-compactions
+            for (int i = 0; i < totalKeys; i++) {
+                String expected = String.format("value-%08d", i);
+                String actual = getString(db, String.format("key-%08d", i));
+                Assertions.assertEquals(
+                        expected, actual, "Phase 1 mismatch for key-" + 
String.format("%08d", i));
+            }
+
+            // Phase 2: Overwrite half the keys
+            for (int i = 0; i < totalKeys / 2; i++) {
+                putString(db, String.format("key-%08d", i), 
String.format("updated-%08d", i));
+            }
+
+            // Verify overwrites
+            for (int i = 0; i < totalKeys / 2; i++) {
+                String expected = String.format("updated-%08d", i);
+                String actual = getString(db, String.format("key-%08d", i));
+                Assertions.assertEquals(
+                        expected,
+                        actual,
+                        "Phase 2 overwrite mismatch for key-" + 
String.format("%08d", i));
+            }
+            for (int i = totalKeys / 2; i < totalKeys; i++) {
+                String expected = String.format("value-%08d", i);
+                String actual = getString(db, String.format("key-%08d", i));
+                Assertions.assertEquals(
+                        expected,
+                        actual,
+                        "Phase 2 original mismatch for key-" + 
String.format("%08d", i));
+            }
+
+            // Phase 3: Delete a quarter of the keys
+            for (int i = 0; i < totalKeys / 4; i++) {
+                deleteString(db, String.format("key-%08d", i));
+            }
+
+            // Verify deletes
+            for (int i = 0; i < totalKeys / 4; i++) {
+                Assertions.assertNull(
+                        getString(db, String.format("key-%08d", i)),
+                        "Phase 3 deleted key should be null: key-" + 
String.format("%08d", i));
+            }
+
+            // Phase 4: Full compaction — merges all runs and cleans tombstones
+            int fileCountBefore = db.getSstFileCount();
+            db.compact();
+            int fileCountAfter = db.getSstFileCount();
+
+            Assertions.assertTrue(
+                    fileCountAfter <= fileCountBefore,
+                    "Full compaction should reduce file count: before="
+                            + fileCountBefore
+                            + " after="
+                            + fileCountAfter);
+            Assertions.assertEquals(0, db.getLevelFileCount(0));
+
+            // Verify all data integrity after full compaction
+            for (int i = 0; i < totalKeys / 4; i++) {
+                Assertions.assertNull(
+                        getString(db, String.format("key-%08d", i)),
+                        "After compact, deleted key should be null: key-"
+                                + String.format("%08d", i));
+            }
+            for (int i = totalKeys / 4; i < totalKeys / 2; i++) {
+                Assertions.assertEquals(
+                        String.format("updated-%08d", i),
+                        getString(db, String.format("key-%08d", i)),
+                        "After compact, updated key mismatch: key-" + 
String.format("%08d", i));
+            }
+            for (int i = totalKeys / 2; i < totalKeys; i++) {
+                Assertions.assertEquals(
+                        String.format("value-%08d", i),
+                        getString(db, String.format("key-%08d", i)),
+                        "After compact, original key mismatch: key-" + 
String.format("%08d", i));
+            }
+
+            // Phase 5: Write more data after full compaction to ensure DB is 
still functional
+            for (int i = totalKeys; i < totalKeys + 500; i++) {
+                putString(db, String.format("key-%08d", i), 
String.format("new-%08d", i));
+            }
+
+            for (int i = totalKeys; i < totalKeys + 500; i++) {
+                Assertions.assertEquals(
+                        String.format("new-%08d", i),
+                        getString(db, String.format("key-%08d", i)),
+                        "Phase 5 new key mismatch: key-" + 
String.format("%08d", i));
+            }
+        } finally {
+            db.close();
+        }
+    }
+
+    @Test
+    public void testCompactRemovesTombstonesAndMerges() throws IOException {
+        try (SimpleLsmKvDb db = createDb()) {
+            // Create data across multiple flushes
+            for (int i = 0; i < 50; i++) {
+                putString(db, String.format("key-%05d", i), 
String.format("value-%05d", i));
+            }
+            db.flush();
+
+            for (int i = 25; i < 75; i++) {
+                putString(db, String.format("key-%05d", i), 
String.format("updated-%05d", i));
+            }
+            db.flush();
+
+            // Delete some keys
+            for (int i = 0; i < 10; i++) {
+                deleteString(db, String.format("key-%05d", i));
+            }
+            db.flush();
+
+            // Full compaction should merge everything and remove tombstones
+            db.compact();
+
+            // Verify results
+            for (int i = 0; i < 10; i++) {
+                Assertions.assertNull(
+                        getString(db, String.format("key-%05d", i)),
+                        "Deleted key should be null: key-" + i);
+            }
+            for (int i = 10; i < 25; i++) {
+                Assertions.assertEquals(
+                        String.format("value-%05d", i),
+                        getString(db, String.format("key-%05d", i)));
+            }
+            for (int i = 25; i < 75; i++) {
+                Assertions.assertEquals(
+                        String.format("updated-%05d", i),
+                        getString(db, String.format("key-%05d", i)));
+            }
+        }
+    }
+
+    @Test
+    public void testLevelStats() throws IOException {
+        SimpleLsmKvDb db =
+                SimpleLsmKvDb.builder(new File(tempDir.toFile(), "stats-db"))
+                        .memTableFlushThreshold(1024 * 1024)
+                        .blockSize(256)
+                        .cacheSize(4 * 1024 * 1024)
+                        .level0FileNumCompactTrigger(10)
+                        .compressOptions(new CompressOptions("none", 1))
+                        .build();
+
+        try {
+            Assertions.assertEquals("empty", db.getLevelStats());
+
+            putString(db, "a", "1");
+            db.flush();
+            Assertions.assertTrue(db.getLevelStats().contains("L0=1"));
+
+            putString(db, "b", "2");
+            db.flush();
+            Assertions.assertTrue(db.getLevelStats().contains("L0=2"));
+        } finally {
+            db.close();
+        }
+    }
+
+    @Test
+    public void testCustomComparatorReverseOrder() throws IOException {
+        // Reverse comparator: keys are ordered in descending byte order
+        Comparator<MemorySlice> reverseComparator =
+                new Comparator<MemorySlice>() {
+                    @Override
+                    public int compare(MemorySlice a, MemorySlice b) {
+                        return b.compareTo(a);
+                    }
+                };
+
+        SimpleLsmKvDb db =
+                SimpleLsmKvDb.builder(new File(tempDir.toFile(), "reverse-db"))
+                        .memTableFlushThreshold(1024 * 1024)
+                        .blockSize(256)
+                        .cacheSize(4 * 1024 * 1024)
+                        .level0FileNumCompactTrigger(3)
+                        .compressOptions(new CompressOptions("none", 1))
+                        .keyComparator(reverseComparator)
+                        .build();
+
+        try {
+            // Basic put/get with reverse comparator
+            putString(db, "aaa", "value-a");
+            putString(db, "bbb", "value-b");
+            putString(db, "ccc", "value-c");
+
+            Assertions.assertEquals("value-a", getString(db, "aaa"));
+            Assertions.assertEquals("value-b", getString(db, "bbb"));
+            Assertions.assertEquals("value-c", getString(db, "ccc"));
+
+            // Flush to SST and verify reads still work with reverse comparator
+            db.flush();
+            Assertions.assertEquals("value-a", getString(db, "aaa"));
+            Assertions.assertEquals("value-b", getString(db, "bbb"));
+            Assertions.assertEquals("value-c", getString(db, "ccc"));
+            Assertions.assertNull(getString(db, "ddd"));
+        } finally {
+            db.close();
+        }
+    }
+
+    @Test
+    public void testCustomComparatorWithCompaction() throws IOException {
+        // Reverse comparator to ensure compaction uses the custom comparator
+        Comparator<MemorySlice> reverseComparator =
+                new Comparator<MemorySlice>() {
+                    @Override
+                    public int compare(MemorySlice a, MemorySlice b) {
+                        return b.compareTo(a);
+                    }
+                };
+
+        SimpleLsmKvDb db =
+                SimpleLsmKvDb.builder(new File(tempDir.toFile(), 
"reverse-compact-db"))
+                        .memTableFlushThreshold(1024 * 1024)
+                        .blockSize(256)
+                        .cacheSize(4 * 1024 * 1024)
+                        .level0FileNumCompactTrigger(3)
+                        .compressOptions(new CompressOptions("none", 1))
+                        .keyComparator(reverseComparator)
+                        .build();
+
+        try {
+            // Create multiple L0 files with overlapping keys to trigger 
compaction
+            putString(db, "key-a", "old-a");
+            putString(db, "key-b", "old-b");
+            db.flush();
+
+            putString(db, "key-a", "new-a");
+            putString(db, "key-c", "new-c");
+            db.flush();
+
+            putString(db, "key-b", "new-b");
+            putString(db, "key-d", "new-d");
+            db.flush();
+            // Compaction should have been triggered (threshold = 3)
+
+            // Newer values should win after compaction with reverse comparator
+            Assertions.assertEquals("new-a", getString(db, "key-a"));
+            Assertions.assertEquals("new-b", getString(db, "key-b"));
+            Assertions.assertEquals("new-c", getString(db, "key-c"));
+            Assertions.assertEquals("new-d", getString(db, "key-d"));
+        } finally {
+            db.close();
+        }
+    }
+
+    @Test
+    public void testCustomComparatorDeleteAcrossFlushAndCompact() throws 
IOException {
+        // Reverse comparator
+        Comparator<MemorySlice> reverseComparator =
+                new Comparator<MemorySlice>() {
+                    @Override
+                    public int compare(MemorySlice a, MemorySlice b) {
+                        return b.compareTo(a);
+                    }
+                };
+
+        SimpleLsmKvDb db =
+                SimpleLsmKvDb.builder(new File(tempDir.toFile(), 
"reverse-delete-db"))
+                        .memTableFlushThreshold(1024 * 1024)
+                        .blockSize(256)
+                        .cacheSize(4 * 1024 * 1024)
+                        .level0FileNumCompactTrigger(4)
+                        .compressOptions(new CompressOptions("none", 1))
+                        .keyComparator(reverseComparator)
+                        .build();
+
+        try {
+            putString(db, "key-x", "value-x");
+            putString(db, "key-y", "value-y");
+            putString(db, "key-z", "value-z");
+            db.flush();
+
+            // Delete key-y via tombstone
+            deleteString(db, "key-y");
+            db.flush();
+
+            Assertions.assertNull(getString(db, "key-y"));
+            Assertions.assertEquals("value-x", getString(db, "key-x"));
+            Assertions.assertEquals("value-z", getString(db, "key-z"));
+
+            // Compaction should clean up tombstones
+            db.compact();
+
+            Assertions.assertNull(getString(db, "key-y"));
+            Assertions.assertEquals("value-x", getString(db, "key-x"));
+            Assertions.assertEquals("value-z", getString(db, "key-z"));
+        } finally {
+            db.close();
+        }
+    }
+
+    // ---- Tests for group-based merge optimization ----
+
+    @Test
+    public void testNonOverlappingFilesSkipMerge() throws IOException {
+        // Non-overlapping files should be kept as-is during compaction 
(skipped groups).
+        SimpleLsmKvDb db =
+                SimpleLsmKvDb.builder(new File(tempDir.toFile(), 
"non-overlap-skip-db"))
+                        .memTableFlushThreshold(1024 * 1024)
+                        .blockSize(256)
+                        .cacheSize(4 * 1024 * 1024)
+                        .level0FileNumCompactTrigger(3)
+                        .compressOptions(new CompressOptions("none", 1))
+                        .build();
+
+        try {
+            // Each flush creates a file with a distinct, non-overlapping key 
range
+            putString(db, "aaa", "1");
+            db.flush();
+            putString(db, "mmm", "2");
+            db.flush();
+            putString(db, "zzz", "3");
+            db.flush(); // triggers compaction (threshold = 3)
+
+            // Data integrity must be preserved
+            Assertions.assertEquals("1", getString(db, "aaa"));
+            Assertions.assertEquals("2", getString(db, "mmm"));
+            Assertions.assertEquals("3", getString(db, "zzz"));
+        } finally {
+            db.close();
+        }
+    }
+
+    @Test
+    public void testOverlappingFilesAreMergedInGroups() throws IOException {
+        // Files with overlapping key ranges should be merged within the same 
group.
+        SimpleLsmKvDb db =
+                SimpleLsmKvDb.builder(new File(tempDir.toFile(), 
"overlap-group-db"))
+                        .memTableFlushThreshold(1024 * 1024)
+                        .blockSize(256)
+                        .cacheSize(4 * 1024 * 1024)
+                        .level0FileNumCompactTrigger(3)
+                        .compressOptions(new CompressOptions("none", 1))
+                        .build();
+
+        try {
+            // All three flushes share "shared-key", so all files overlap
+            putString(db, "shared-key", "v1");
+            putString(db, "aaa", "a1");
+            db.flush();
+            putString(db, "shared-key", "v2");
+            putString(db, "bbb", "b1");
+            db.flush();
+            putString(db, "shared-key", "v3");
+            putString(db, "ccc", "c1");
+            db.flush(); // triggers compaction
+
+            // Newest value wins for the shared key
+            Assertions.assertEquals("v3", getString(db, "shared-key"));
+            Assertions.assertEquals("a1", getString(db, "aaa"));
+            Assertions.assertEquals("b1", getString(db, "bbb"));
+            Assertions.assertEquals("c1", getString(db, "ccc"));
+        } finally {
+            db.close();
+        }
+    }
+
+    @Test
+    public void testMixedOverlapAndNonOverlapGroups() throws IOException {
+        // Some files overlap (forming one group) while others don't (separate 
groups).
+        SimpleLsmKvDb db =
+                SimpleLsmKvDb.builder(new File(tempDir.toFile(), 
"mixed-group-db"))
+                        .memTableFlushThreshold(1024 * 1024)
+                        .blockSize(256)
+                        .cacheSize(4 * 1024 * 1024)
+                        .level0FileNumCompactTrigger(4)
+                        .compressOptions(new CompressOptions("none", 1))
+                        .build();
+
+        try {
+            // Run 1: keys a-c (overlaps with run 2)
+            putString(db, "a", "a1");
+            putString(db, "b", "b1");
+            putString(db, "c", "c1");
+            db.flush();
+
+            // Run 2: keys b-d (overlaps with run 1, forms one group)
+            putString(db, "b", "b2");
+            putString(db, "d", "d1");
+            db.flush();
+
+            // Run 3: keys x-z (no overlap with runs 1-2, separate group)
+            putString(db, "x", "x1");
+            putString(db, "y", "y1");
+            putString(db, "z", "z1");
+            db.flush();
+
+            // Run 4: key m (no overlap, separate group) — triggers compaction
+            putString(db, "m", "m1");
+            db.flush();
+
+            // Verify data integrity: overlapping keys use newest value
+            Assertions.assertEquals("a1", getString(db, "a"));
+            Assertions.assertEquals("b2", getString(db, "b")); // newest wins
+            Assertions.assertEquals("c1", getString(db, "c"));
+            Assertions.assertEquals("d1", getString(db, "d"));
+            Assertions.assertEquals("m1", getString(db, "m"));
+            Assertions.assertEquals("x1", getString(db, "x"));
+            Assertions.assertEquals("y1", getString(db, "y"));
+            Assertions.assertEquals("z1", getString(db, "z"));
+        } finally {
+            db.close();
+        }
+    }
+
+    @Test
+    public void testTombstoneFileNotSkippedDuringFullCompact() throws 
IOException {
+        // A non-overlapping file with tombstones should NOT be skipped during 
full compaction
+        // (dropTombstones=true), ensuring tombstones are cleaned up.
+        SimpleLsmKvDb db =
+                SimpleLsmKvDb.builder(new File(tempDir.toFile(), 
"tombstone-no-skip-db"))
+                        .memTableFlushThreshold(1024 * 1024)
+                        .blockSize(256)
+                        .cacheSize(4 * 1024 * 1024)
+                        .level0FileNumCompactTrigger(3)
+                        .compressOptions(new CompressOptions("none", 1))
+                        .build();
+
+        try {
+            putString(db, "key1", "value1");
+            putString(db, "key2", "value2");
+            db.flush();
+
+            // Delete key1 — creates a tombstone
+            deleteString(db, "key1");
+            db.flush();
+
+            // Full compaction should process the tombstone file even if it 
doesn't overlap
+            db.compact();
+
+            Assertions.assertNull(getString(db, "key1"));
+            Assertions.assertEquals("value2", getString(db, "key2"));
+        } finally {
+            db.close();
+        }
+    }
+
+    @Test
+    public void testGroupMergeWithMultipleCompactionRounds() throws 
IOException {
+        // Multiple rounds of compaction with mixed 
overlapping/non-overlapping data.
+        SimpleLsmKvDb db =
+                SimpleLsmKvDb.builder(new File(tempDir.toFile(), 
"multi-round-group-db"))
+                        .memTableFlushThreshold(1024 * 1024)
+                        .blockSize(256)
+                        .cacheSize(4 * 1024 * 1024)
+                        .level0FileNumCompactTrigger(3)
+                        .compressOptions(new CompressOptions("none", 1))
+                        .build();
+
+        try {
+            // Round 1: non-overlapping keys
+            putString(db, "a", "a1");
+            db.flush();
+            putString(db, "m", "m1");
+            db.flush();
+            putString(db, "z", "z1");
+            db.flush(); // triggers compaction
+
+            // Round 2: overlapping updates
+            putString(db, "a", "a2");
+            db.flush();
+            putString(db, "a", "a3");
+            putString(db, "b", "b1");
+            db.flush();
+            putString(db, "c", "c1");
+            db.flush(); // triggers compaction again
+
+            // Full compaction to consolidate everything
+            db.compact();
+
+            Assertions.assertEquals("a3", getString(db, "a"));
+            Assertions.assertEquals("b1", getString(db, "b"));
+            Assertions.assertEquals("c1", getString(db, "c"));
+            Assertions.assertEquals("m1", getString(db, "m"));
+            Assertions.assertEquals("z1", getString(db, "z"));
+        } finally {
+            db.close();
+        }
+    }
+
+    @Test
+    public void testCompactionMergesAllL0RunsAndIncludesL1() throws 
IOException {
+        // Verify that size-ratio compaction always merges all L0 files and L1 
data,
+        // so the total sorted run count never exceeds maxLevels.
+        SimpleLsmKvDb db =
+                SimpleLsmKvDb.builder(new File(tempDir.toFile(), 
"l0-clear-db"))
+                        .memTableFlushThreshold(1024 * 1024)
+                        .blockSize(256)
+                        .cacheSize(4 * 1024 * 1024)
+                        .level0FileNumCompactTrigger(3)
+                        .sizeRatio(1)
+                        .compressOptions(new CompressOptions("none", 1))
+                        .build();
+
+        try {
+            // Step 1: Create 3 L0 files to trigger first compaction
+            putString(db, "aaa", "v1");
+            db.flush();
+            putString(db, "bbb", "v2");
+            db.flush();
+            putString(db, "ccc", "v3");
+            db.flush();
+            // After compaction, data should exist in SST files
+            Assertions.assertTrue(
+                    db.getSstFileCount() > 0, "SST files should exist after 
compaction");
+
+            // Step 2: Create 3 more L0 files to trigger second compaction
+            // Now deeper levels already have data from the first compaction.
+            // The fix ensures L1 is included in the merge, preventing 
overflow.
+            putString(db, "ddd", "v4");
+            db.flush();
+            putString(db, "eee", "v5");
+            db.flush();
+            putString(db, "fff", "v6");
+            db.flush();
+
+            // All data should be accessible after multiple compaction rounds
+            Assertions.assertEquals("v1", getString(db, "aaa"));
+            Assertions.assertEquals("v2", getString(db, "bbb"));
+            Assertions.assertEquals("v3", getString(db, "ccc"));
+            Assertions.assertEquals("v4", getString(db, "ddd"));
+            Assertions.assertEquals("v5", getString(db, "eee"));
+            Assertions.assertEquals("v6", getString(db, "fff"));
+
+            // Total file count across all levels should be reasonable (no 
overflow)
+            int totalFiles = db.getSstFileCount();
+            Assertions.assertTrue(
+                    totalFiles > 0 && totalFiles < 20,
+                    "File count should be reasonable, got: " + totalFiles);
+        } finally {
+            db.close();
+        }
+    }
+
+    @Test
+    public void testCompactionWithManyRoundsNoOverflow() throws IOException {
+        // Stress test: many rounds of flush + compaction to ensure no 
overflow ever occurs
+        // and data integrity is maintained throughout.
+        SimpleLsmKvDb db =
+                SimpleLsmKvDb.builder(new File(tempDir.toFile(), "many-l0-db"))
+                        .memTableFlushThreshold(1024 * 1024)
+                        .blockSize(256)
+                        .cacheSize(4 * 1024 * 1024)
+                        .level0FileNumCompactTrigger(3)
+                        .sizeRatio(1)
+                        .compressOptions(new CompressOptions("none", 1))
+                        .build();
+
+        try {
+            int totalRounds = 10;
+            for (int round = 0; round < totalRounds; round++) {
+                for (int i = 0; i < 3; i++) {
+                    String key = String.format("round-%02d-key-%02d", round, 
i);
+                    String value = String.format("value-%02d-%02d", round, i);
+                    putString(db, key, value);
+                    db.flush();
+                }
+                // After each batch of 3 flushes, compaction should have been 
triggered.
+                // Verify that the number of occupied levels is within bounds.
+                int occupiedLevels = 0;
+                for (int level = 0; level < SimpleLsmKvDb.MAX_LEVELS; level++) 
{
+                    if (db.getLevelFileCount(level) > 0) {
+                        occupiedLevels++;
+                    }
+                }
+                Assertions.assertTrue(
+                        occupiedLevels <= SimpleLsmKvDb.MAX_LEVELS,
+                        "Occupied levels should not exceed MAX_LEVELS in round 
"
+                                + round
+                                + ", got: "
+                                + occupiedLevels);
+            }
+
+            // Verify all data is still accessible
+            for (int round = 0; round < totalRounds; round++) {
+                for (int i = 0; i < 3; i++) {
+                    String key = String.format("round-%02d-key-%02d", round, 
i);
+                    String expected = String.format("value-%02d-%02d", round, 
i);
+                    Assertions.assertEquals(expected, getString(db, key), 
"Mismatch for " + key);
+                }
+            }
+        } finally {
+            db.close();
+        }
+    }
+
+    @Test
+    public void testCompactionWithOverlappingKeysAcrossL0AndL1() throws 
IOException {
+        // Verify that when L0 files have overlapping keys with existing level 
data,
+        // compaction correctly deduplicates and the newest value wins.
+        SimpleLsmKvDb db =
+                SimpleLsmKvDb.builder(new File(tempDir.toFile(), 
"overlap-l0-l1-db"))
+                        .memTableFlushThreshold(1024 * 1024)
+                        .blockSize(256)
+                        .cacheSize(4 * 1024 * 1024)
+                        .level0FileNumCompactTrigger(3)
+                        .sizeRatio(1)
+                        .compressOptions(new CompressOptions("none", 1))
+                        .build();
+
+        try {
+            // First round: write and compact to push data into deeper levels
+            putString(db, "shared", "version-1");
+            putString(db, "only-old", "old-value");
+            db.flush();
+            putString(db, "aaa", "a1");
+            db.flush();
+            putString(db, "bbb", "b1");
+            db.flush();
+            // Compaction triggered
+
+            // Second round: overwrite "shared" key and trigger compaction 
again
+            // This forces merge with overlapping keys across levels
+            putString(db, "shared", "version-2");
+            db.flush();
+            putString(db, "ccc", "c1");
+            db.flush();
+            putString(db, "ddd", "d1");
+            db.flush();
+            // Compaction triggered again
+
+            // Newest value should win
+            Assertions.assertEquals("version-2", getString(db, "shared"));
+            Assertions.assertEquals("old-value", getString(db, "only-old"));
+            Assertions.assertEquals("a1", getString(db, "aaa"));
+            Assertions.assertEquals("b1", getString(db, "bbb"));
+            Assertions.assertEquals("c1", getString(db, "ccc"));
+            Assertions.assertEquals("d1", getString(db, "ddd"));
+
+            // Third round: overwrite again and verify
+            putString(db, "shared", "version-3");
+            db.flush();
+            putString(db, "eee", "e1");
+            db.flush();
+            putString(db, "fff", "f1");
+            db.flush();
+
+            Assertions.assertEquals("version-3", getString(db, "shared"));
+            Assertions.assertEquals("old-value", getString(db, "only-old"));
+            Assertions.assertEquals("e1", getString(db, "eee"));
+            Assertions.assertEquals("f1", getString(db, "fff"));
+        } finally {
+            db.close();
+        }
+    }
+
+    @Test
+    public void testGroupMergePreservesDeleteSemantics() throws IOException {
+        // Ensure that deletes are correctly handled across group-based merge.
+        SimpleLsmKvDb db =
+                SimpleLsmKvDb.builder(new File(tempDir.toFile(), 
"group-delete-db"))
+                        .memTableFlushThreshold(1024 * 1024)
+                        .blockSize(256)
+                        .cacheSize(4 * 1024 * 1024)
+                        .level0FileNumCompactTrigger(3)
+                        .compressOptions(new CompressOptions("none", 1))
+                        .build();
+
+        try {
+            // Write keys in different ranges
+            putString(db, "aaa", "v1");
+            putString(db, "bbb", "v2");
+            db.flush();
+
+            putString(db, "xxx", "v3");
+            putString(db, "yyy", "v4");
+            db.flush();
+
+            // Delete one key from each range
+            deleteString(db, "aaa");
+            deleteString(db, "xxx");
+            db.flush(); // triggers compaction
+
+            Assertions.assertNull(getString(db, "aaa"));
+            Assertions.assertEquals("v2", getString(db, "bbb"));
+            Assertions.assertNull(getString(db, "xxx"));
+            Assertions.assertEquals("v4", getString(db, "yyy"));
+
+            // Full compaction should clean up tombstones
+            db.compact();
+
+            Assertions.assertNull(getString(db, "aaa"));
+            Assertions.assertEquals("v2", getString(db, "bbb"));
+            Assertions.assertNull(getString(db, "xxx"));
+            Assertions.assertEquals("v4", getString(db, "yyy"));
+        } finally {
+            db.close();
+        }
+    }
+
+    private static void putString(SimpleLsmKvDb db, String key, String value) 
throws IOException {
+        db.put(key.getBytes(UTF_8), value.getBytes(UTF_8));
+    }
+
+    private static String getString(SimpleLsmKvDb db, String key) throws 
IOException {
+        byte[] bytes = db.get(key.getBytes(UTF_8));
+        if (bytes == null) {
+            return null;
+        }
+        return new String(bytes, UTF_8);
+    }
+
+    private static void deleteString(SimpleLsmKvDb db, String key) throws 
IOException {
+        db.delete(key.getBytes(UTF_8));
+    }
+}

Reply via email to