This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new b110f6ed0a [core] Introduce SimpleLsmKvDb to put and get based on
Paimon SST (#7407)
b110f6ed0a is described below
commit b110f6ed0abb24ade24129a744d9455791328233
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu Mar 12 14:55:01 2026 +0800
[core] Introduce SimpleLsmKvDb to put and get based on Paimon SST (#7407)
---
.../apache/paimon/lookup/sort/db/LsmCompactor.java | 733 +++++++++++
.../paimon/lookup/sort/db/SimpleLsmKvDb.java | 573 ++++++++
.../paimon/lookup/sort/db/SstFileMetadata.java | 90 ++
.../java/org/apache/paimon/sst/SstFileWriter.java | 8 +-
.../paimon/lookup/sort/db/SimpleLsmKvDbTest.java | 1363 ++++++++++++++++++++
5 files changed, 2764 insertions(+), 3 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/db/LsmCompactor.java
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/db/LsmCompactor.java
new file mode 100644
index 0000000000..64e9cff92a
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/db/LsmCompactor.java
@@ -0,0 +1,733 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lookup.sort.db;
+
+import org.apache.paimon.lookup.sort.SortLookupStoreFactory;
+import org.apache.paimon.lookup.sort.SortLookupStoreReader;
+import org.apache.paimon.lookup.sort.SortLookupStoreWriter;
+import org.apache.paimon.memory.MemorySlice;
+import org.apache.paimon.sst.BlockIterator;
+import org.apache.paimon.sst.SstFileReader;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Set;
+
+import static org.apache.paimon.lookup.sort.db.SimpleLsmKvDb.isTombstone;
+
+/**
+ * Handles Universal Compaction for the LSM-Tree, inspired by RocksDB's
Universal Compaction.
+ *
+ * <p>Universal Compaction treats all SST files as a flat list of sorted runs
ordered from newest to
+ * oldest. Instead of compacting level-by-level, it picks contiguous runs to
merge based on size
+ * ratios between adjacent runs.
+ *
+ * <p>Compaction trigger: when the number of level-0 files reaches {@code
+ * level0FileNumCompactionTrigger}.
+ *
+ * <p>Run selection (from newest to oldest):
+ *
+ * <ol>
+ * <li><b>Size-ratio based</b>: Starting from the newest run, accumulate
runs as long as {@code
+ * accumulatedSize / nextRunSize < sizeRatio%}. If at least 2 runs are
accumulated, merge
+ * them.
+ * <li><b>Fallback</b>: If size-ratio selection finds fewer than 2
candidates, merge all runs.
+ * </ol>
+ *
+ * <p>Tombstones (empty values) are only removed when all runs are being
merged (equivalent to
+ * compacting to the bottom level).
+ */
+public class LsmCompactor {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(LsmCompactor.class);
+
+ private final Comparator<MemorySlice> keyComparator;
+ private final SortLookupStoreFactory storeFactory;
+ private final long maxOutputFileSize;
+ private final int level0FileNumCompactTrigger;
+ private final int sizeRatioPercent;
+ private final FileDeleter fileDeleter;
+
+ public LsmCompactor(
+ Comparator<MemorySlice> keyComparator,
+ SortLookupStoreFactory storeFactory,
+ long maxOutputFileSize,
+ int level0FileNumCompactTrigger,
+ int sizeRatioPercent,
+ FileDeleter fileDeleter) {
+ this.keyComparator = keyComparator;
+ this.storeFactory = storeFactory;
+ this.maxOutputFileSize = maxOutputFileSize;
+ this.level0FileNumCompactTrigger = level0FileNumCompactTrigger;
+ this.sizeRatioPercent = sizeRatioPercent;
+ this.fileDeleter = fileDeleter;
+ }
+
+ /**
+ * Check if compaction is needed and perform it if so.
+ *
+ * <p>All sorted runs are collected from all levels into a flat list
ordered newest-first. If
+ * the number of level-0 files reaches {@code
level0FileNumCompactionTrigger}, a compaction is
+ * triggered.
+ *
+ * <p>When the size-ratio branch is chosen, the candidate range is
guaranteed to include all
+ * level-0 runs and the level-1 run (if present). This ensures level 0 is
fully cleared after
+ * compaction and the merged result can be placed into level 1 without
overflow.
+ *
+ * @param levels the multi-level SST file storage (used as a flat
sorted-run list)
+ * @param maxLevels the maximum number of levels (only level 0 is used for
sorted runs)
+ * @param fileSupplier supplier for new SST file paths
+ */
+ public void maybeCompact(
+ List<List<SstFileMetadata>> levels, int maxLevels, FileSupplier
fileSupplier)
+ throws IOException {
+ int levelZeroFileCount = levels.get(0).size();
+ if (levelZeroFileCount < level0FileNumCompactTrigger) {
+ return;
+ }
+
+ List<SortedRun> sortedRuns = collectSortedRuns(levels, maxLevels);
+
+ LOG.info(
+ "Universal compaction triggered: {} L0 files (threshold: {}),
{} total sorted runs",
+ levelZeroFileCount,
+ level0FileNumCompactTrigger,
+ sortedRuns.size());
+
+ // Try size-ratio based selection: accumulate runs from newest (index
0) to oldest
+ int candidateEnd = pickSizeRatioCandidates(sortedRuns);
+
+ // Ensure all L0 runs and the L1 run (if present) are included in the
merge.
+ int minCandidateEnd = 0;
+ for (SortedRun run : sortedRuns) {
+ if (!run.files.isEmpty() && run.files.get(0).getLevel() <= 1) {
+ minCandidateEnd++;
+ } else {
+ break;
+ }
+ }
+ candidateEnd = Math.max(candidateEnd, minCandidateEnd);
+
+ if (candidateEnd >= 2) {
+ // Merge runs [0, candidateEnd)
+ LOG.info(
+ "Size-ratio compaction: merging {} newest runs out of {}",
+ candidateEnd,
+ sortedRuns.size());
+ List<SortedRun> toMerge = new ArrayList<>(sortedRuns.subList(0,
candidateEnd));
+ List<SortedRun> remaining =
+ new ArrayList<>(sortedRuns.subList(candidateEnd,
sortedRuns.size()));
+ boolean dropTombstones = remaining.isEmpty();
+
+ // Find the highest level not occupied by remaining runs
+ int outputLevel = findHighestFreeLevel(levels, maxLevels, toMerge);
+ MergeResult mergeResult =
+ mergeSortedRuns(toMerge, dropTombstones, fileSupplier,
outputLevel);
+
+ // Clear levels used by merged runs and place merged result
+ clearLevelsOfRuns(levels, toMerge);
+ levels.get(outputLevel).addAll(mergeResult.mergedRun.files);
+ deleteOldFiles(toMerge, mergeResult.skippedFiles);
+ } else {
+ // Fallback: merge all runs
+ LOG.info("Fallback full compaction: merging all {} runs",
sortedRuns.size());
+ mergeAllRuns(levels, maxLevels, sortedRuns, fileSupplier);
+ }
+ }
+
+ /**
+ * Force a full compaction of all sorted runs into a single run.
+ *
+ * @param levels the multi-level SST file storage
+ * @param maxLevels the maximum number of levels
+ * @param fileSupplier supplier for new SST file paths
+ */
+ public void fullCompact(
+ List<List<SstFileMetadata>> levels, int maxLevels, FileSupplier
fileSupplier)
+ throws IOException {
+ List<SortedRun> sortedRuns = collectSortedRuns(levels, maxLevels);
+ if (sortedRuns.size() <= 1) {
+ return;
+ }
+
+ LOG.info("Full compaction: merging all {} sorted runs",
sortedRuns.size());
+ mergeAllRuns(levels, maxLevels, sortedRuns, fileSupplier);
+ }
+
+ /**
+ * Merge all sorted runs into a single run, placing the result at the
highest level. Tombstones
+ * are dropped since there are no older runs below.
+ */
+ private void mergeAllRuns(
+ List<List<SstFileMetadata>> levels,
+ int maxLevels,
+ List<SortedRun> sortedRuns,
+ FileSupplier fileSupplier)
+ throws IOException {
+ int outputLevel = maxLevels - 1;
+ MergeResult mergeResult = mergeSortedRuns(sortedRuns, true,
fileSupplier, outputLevel);
+
+ for (int i = 0; i < maxLevels; i++) {
+ levels.get(i).clear();
+ }
+ levels.get(outputLevel).addAll(mergeResult.mergedRun.files);
+ deleteOldFiles(sortedRuns, mergeResult.skippedFiles);
+ }
+
+ //
-------------------------------------------------------------------------
+ // Universal Compaction internals
+ //
-------------------------------------------------------------------------
+
+ /**
+ * Collect all SST files into a flat list of sorted runs, ordered
newest-first.
+ *
+ * <p>Level 0 files are each treated as an individual sorted run (keys may
overlap). Level 1+
+ * files within the same level form a single sorted run (keys do not
overlap within a level).
+ */
+ private List<SortedRun> collectSortedRuns(List<List<SstFileMetadata>>
levels, int maxLevels) {
+ List<SortedRun> runs = new ArrayList<>();
+
+ // Level 0: each file is its own sorted run, newest first (L0 files
are stored newest-first)
+ List<SstFileMetadata> levelZeroFiles = levels.get(0);
+ for (SstFileMetadata file : levelZeroFiles) {
+ List<SstFileMetadata> singleFile = new ArrayList<>();
+ singleFile.add(file);
+ runs.add(new SortedRun(singleFile));
+ }
+
+ // Level 1+: all files in a level form one sorted run
+ for (int level = 1; level < maxLevels; level++) {
+ List<SstFileMetadata> levelFiles = levels.get(level);
+ if (!levelFiles.isEmpty()) {
+ runs.add(new SortedRun(new ArrayList<>(levelFiles)));
+ }
+ }
+
+ return runs;
+ }
+
+ /**
+ * Pick candidates for size-ratio based compaction.
+ *
+ * <p>Starting from the newest run (index 0), accumulate runs as long as:
{@code accumulatedSize
+ * / nextRunSize * 100 < sizeRatioPercent}.
+ *
+ * @return the number of runs to merge (from index 0). Returns 0 or 1 if
no suitable candidates.
+ */
+ private int pickSizeRatioCandidates(List<SortedRun> sortedRuns) {
+ if (sortedRuns.size() < 2) {
+ return 0;
+ }
+
+ long accumulatedSize = sortedRuns.get(0).totalSize();
+ int candidateCount = 1;
+
+ for (int i = 1; i < sortedRuns.size(); i++) {
+ long nextRunSize = sortedRuns.get(i).totalSize();
+
+ // Check: accumulatedSize / nextRunSize * 100 < sizeRatioPercent
+ // Rewritten to avoid floating point: accumulatedSize * 100 <
sizeRatioPercent *
+ // nextRunSize
+ if (accumulatedSize * 100 < (long) sizeRatioPercent * nextRunSize)
{
+ accumulatedSize += nextRunSize;
+ candidateCount++;
+ } else {
+ break;
+ }
+ }
+
+ return candidateCount;
+ }
+
+ /**
+ * Merge multiple sorted runs into a single sorted run using a min-heap
based multi-way merge.
+ *
+ * @param runsToMerge the sorted runs to merge (ordered newest-first)
+ * @param dropTombstones whether to drop tombstone entries
+ * @param fileSupplier supplier for new SST file paths
+ * @param outputLevel the level to assign to all output files
+ * @return the merged sorted run with output level set on all files
+ */
+ private MergeResult mergeSortedRuns(
+ List<SortedRun> runsToMerge,
+ boolean dropTombstones,
+ FileSupplier fileSupplier,
+ int outputLevel)
+ throws IOException {
+
+ List<SstFileMetadata> allFiles = new ArrayList<>();
+ for (SortedRun run : runsToMerge) {
+ allFiles.addAll(run.files);
+ }
+
+ long totalInputSize = 0;
+ for (SstFileMetadata meta : allFiles) {
+ totalInputSize += meta.getFileSize();
+ }
+ LOG.info(
+ "Starting merge: {} runs, {} total files, {} total bytes,
dropTombstones={}",
+ runsToMerge.size(),
+ allFiles.size(),
+ totalInputSize,
+ dropTombstones);
+
+ long mergeStartTime = System.currentTimeMillis();
+
+ List<List<SstFileMetadata>> groups = groupFilesByKeyOverlap(allFiles);
+ List<List<SstFileMetadata>> mergedGroups =
mergeSmallAdjacentGroups(groups);
+ Map<File, Integer> fileToRunSequence =
buildFileToRunSequenceMap(runsToMerge);
+
+ List<SstFileMetadata> outputFiles = new ArrayList<>();
+ Set<File> skippedFileSet = new HashSet<>();
+ int skippedGroupCount = 0;
+ int mergedGroupCount = 0;
+
+ for (List<SstFileMetadata> group : mergedGroups) {
+ if (group.size() == 1) {
+ SstFileMetadata singleFile = group.get(0);
+ boolean canSkip = !dropTombstones ||
!singleFile.hasTombstones();
+ if (canSkip) {
+ SstFileMetadata promoted =
singleFile.withLevel(outputLevel);
+ outputFiles.add(promoted);
+ skippedFileSet.add(promoted.getFile());
+ skippedGroupCount++;
+ continue;
+ }
+ }
+
+ mergedGroupCount++;
+ List<SstFileMetadata> groupMerged =
+ mergeFileGroup(
+ group, dropTombstones, fileSupplier,
fileToRunSequence, outputLevel);
+ outputFiles.addAll(groupMerged);
+ }
+
+ outputFiles.sort((a, b) -> keyComparator.compare(a.getMinKey(),
b.getMinKey()));
+
+ long mergeElapsedMs = System.currentTimeMillis() - mergeStartTime;
+ long totalOutputSize = 0;
+ for (SstFileMetadata meta : outputFiles) {
+ totalOutputSize += meta.getFileSize();
+ }
+ LOG.info(
+ "Merge completed in {} ms: {} input files ({} bytes) -> {}
output files "
+ + "({} bytes), {} groups merged, {} groups skipped",
+ mergeElapsedMs,
+ allFiles.size(),
+ totalInputSize,
+ outputFiles.size(),
+ totalOutputSize,
+ mergedGroupCount,
+ skippedGroupCount);
+
+ return new MergeResult(new SortedRun(outputFiles), skippedFileSet,
outputLevel);
+ }
+
+ /**
+ * Group files into connected components by key-range overlap using a
sweep-line algorithm.
+ * Files are sorted by minKey, then adjacent files whose key ranges
overlap are placed in the
+ * same group.
+ */
+ private List<List<SstFileMetadata>>
groupFilesByKeyOverlap(List<SstFileMetadata> allFiles) {
+ List<SstFileMetadata> sortedFiles = new ArrayList<>(allFiles);
+ sortedFiles.sort((a, b) -> keyComparator.compare(a.getMinKey(),
b.getMinKey()));
+
+ List<List<SstFileMetadata>> groups = new ArrayList<>();
+ List<SstFileMetadata> currentGroup = new ArrayList<>();
+ MemorySlice currentGroupMaxKey = null;
+
+ for (SstFileMetadata file : sortedFiles) {
+ if (currentGroup.isEmpty()) {
+ currentGroup.add(file);
+ currentGroupMaxKey = file.getMaxKey();
+ } else if (keyComparator.compare(file.getMinKey(),
currentGroupMaxKey) <= 0) {
+ currentGroup.add(file);
+ if (keyComparator.compare(file.getMaxKey(),
currentGroupMaxKey) > 0) {
+ currentGroupMaxKey = file.getMaxKey();
+ }
+ } else {
+ groups.add(currentGroup);
+ currentGroup = new ArrayList<>();
+ currentGroup.add(file);
+ currentGroupMaxKey = file.getMaxKey();
+ }
+ }
+ if (!currentGroup.isEmpty()) {
+ groups.add(currentGroup);
+ }
+ return groups;
+ }
+
+ /**
+ * Merge small adjacent groups to avoid producing too many small files. If
either the pending
+ * group or the current group is smaller than half of {@link
#maxOutputFileSize}, they are
+ * combined into a single group.
+ */
+ private List<List<SstFileMetadata>> mergeSmallAdjacentGroups(
+ List<List<SstFileMetadata>> groups) {
+ long smallFileThreshold = maxOutputFileSize / 2;
+ List<List<SstFileMetadata>> mergedGroups = new ArrayList<>();
+ List<SstFileMetadata> pendingGroup = null;
+
+ for (List<SstFileMetadata> group : groups) {
+ if (pendingGroup == null) {
+ pendingGroup = new ArrayList<>(group);
+ } else {
+ long pendingSize = groupTotalSize(pendingGroup);
+ long groupSize = groupTotalSize(group);
+ if (pendingSize < smallFileThreshold || groupSize <
smallFileThreshold) {
+ pendingGroup.addAll(group);
+ } else {
+ mergedGroups.add(pendingGroup);
+ pendingGroup = new ArrayList<>(group);
+ }
+ }
+ }
+ if (pendingGroup != null) {
+ mergedGroups.add(pendingGroup);
+ }
+ return mergedGroups;
+ }
+
+ /**
+ * Build a mapping from each file to its run sequence number. Older runs
receive lower sequence
+ * numbers so that during dedup the newest entry (highest sequence) wins.
+ */
+ private static Map<File, Integer>
buildFileToRunSequenceMap(List<SortedRun> runsToMerge) {
+ Map<File, Integer> fileToRunSequence = new HashMap<>();
+ for (int runIdx = 0; runIdx < runsToMerge.size(); runIdx++) {
+ int sequence = runsToMerge.size() - 1 - runIdx;
+ for (SstFileMetadata meta : runsToMerge.get(runIdx).files) {
+ fileToRunSequence.put(meta.getFile(), sequence);
+ }
+ }
+ return fileToRunSequence;
+ }
+
+ private static long groupTotalSize(List<SstFileMetadata> group) {
+ long size = 0;
+ for (SstFileMetadata meta : group) {
+ size += meta.getFileSize();
+ }
+ return size;
+ }
+
+ /**
+ * Merge a group of files using min-heap based multi-way merge.
+ *
+ * @param group the files to merge (may be from different runs)
+ * @param dropTombstones whether to drop tombstone entries
+ * @param fileSupplier supplier for new SST file paths
+ * @param fileToRunSequence maps each file to its run sequence number for
dedup ordering
+ * @param outputLevel the level to assign to output files
+ * @return the list of merged output files
+ */
+ private List<SstFileMetadata> mergeFileGroup(
+ List<SstFileMetadata> group,
+ boolean dropTombstones,
+ FileSupplier fileSupplier,
+ Map<File, Integer> fileToRunSequence,
+ int outputLevel)
+ throws IOException {
+
+ // Sort files by run sequence (older first) for correct dedup ordering
+ List<SstFileMetadata> orderedFiles = new ArrayList<>(group);
+ orderedFiles.sort(
+ (a, b) -> {
+ int seqA = fileToRunSequence.getOrDefault(a.getFile(), 0);
+ int seqB = fileToRunSequence.getOrDefault(b.getFile(), 0);
+ return Integer.compare(seqA, seqB);
+ });
+
+ List<SstFileMetadata> result = new ArrayList<>();
+ List<SortLookupStoreReader> openReaders = new ArrayList<>();
+ PriorityQueue<MergeEntry> minHeap =
+ new PriorityQueue<>(
+ (a, b) -> {
+ int keyCompare = keyComparator.compare(a.key,
b.key);
+ if (keyCompare != 0) {
+ return keyCompare;
+ }
+ return Integer.compare(b.sequence, a.sequence);
+ });
+
+ SortLookupStoreWriter currentWriter = null;
+ try {
+ for (int seq = 0; seq < orderedFiles.size(); seq++) {
+ SortLookupStoreReader reader =
+
storeFactory.createReader(orderedFiles.get(seq).getFile());
+ openReaders.add(reader);
+ SstFileReader.SstFileIterator fileIterator =
reader.createIterator();
+ MergeSource source = new MergeSource(fileIterator, seq);
+ if (source.advance()) {
+ minHeap.add(source.currentEntry());
+ }
+ }
+ File currentSstFile = null;
+ MemorySlice currentFileMinKey = null;
+ MemorySlice currentFileMaxKey = null;
+ long currentBatchSize = 0;
+ long currentTombstoneCount = 0;
+ MemorySlice previousKey = null;
+
+ while (!minHeap.isEmpty()) {
+ MergeEntry entry = minHeap.poll();
+
+ if (previousKey != null && keyComparator.compare(entry.key,
previousKey) == 0) {
+ if (entry.source.advance()) {
+ minHeap.add(entry.source.currentEntry());
+ }
+ continue;
+ }
+
+ previousKey = entry.key;
+
+ if (entry.source.advance()) {
+ minHeap.add(entry.source.currentEntry());
+ }
+
+ if (dropTombstones && isTombstone(entry.value)) {
+ continue;
+ }
+
+ if (currentWriter == null) {
+ currentSstFile = fileSupplier.newSstFile();
+ currentWriter = storeFactory.createWriter(currentSstFile,
null);
+ currentFileMinKey = entry.key;
+ currentBatchSize = 0;
+ currentTombstoneCount = 0;
+ }
+
+ currentWriter.put(entry.key.copyBytes(), entry.value);
+ currentFileMaxKey = entry.key;
+ currentBatchSize += entry.key.length() + entry.value.length;
+ if (isTombstone(entry.value)) {
+ currentTombstoneCount++;
+ }
+
+ if (currentBatchSize >= maxOutputFileSize) {
+ currentWriter.close();
+ result.add(
+ new SstFileMetadata(
+ currentSstFile,
+ currentFileMinKey,
+ currentFileMaxKey,
+ currentTombstoneCount,
+ outputLevel));
+ currentWriter = null;
+ currentSstFile = null;
+ currentFileMinKey = null;
+ currentFileMaxKey = null;
+ }
+ }
+
+ if (currentWriter != null) {
+ currentWriter.close();
+ result.add(
+ new SstFileMetadata(
+ currentSstFile,
+ currentFileMinKey,
+ currentFileMaxKey,
+ currentTombstoneCount,
+ outputLevel));
+ }
+ } catch (IOException | RuntimeException e) {
+ // Close the in-progress writer on failure to avoid resource leak
+ if (currentWriter != null) {
+ try {
+ currentWriter.close();
+ } catch (IOException suppressed) {
+ e.addSuppressed(suppressed);
+ }
+ }
+ throw e;
+ } finally {
+ for (SortLookupStoreReader reader : openReaders) {
+ reader.close();
+ }
+ }
+
+ return result;
+ }
+
+ /**
+ * Find the highest level that is either empty or occupied by one of the
runs being merged.
+ * Starts from {@code maxLevels - 1} and works downward, stopping at level
1 minimum (level 0 is
+ * reserved for new flushes).
+ */
+ private static int findHighestFreeLevel(
+ List<List<SstFileMetadata>> levels, int maxLevels, List<SortedRun>
runsBeingMerged) {
+ Set<Integer> mergedLevels = new HashSet<>();
+ for (SortedRun run : runsBeingMerged) {
+ for (SstFileMetadata file : run.files) {
+ mergedLevels.add(file.getLevel());
+ }
+ }
+
+ // Start from the highest level and work down; a level is usable if it
is empty or
+ // all its files belong to the runs being merged
+ for (int level = maxLevels - 1; level >= 1; level--) {
+ if (levels.get(level).isEmpty() || mergedLevels.contains(level)) {
+ return level;
+ }
+ }
+ return 1;
+ }
+
+ /** Clear the level lists for all levels that contain files from the given
runs. */
+ private static void clearLevelsOfRuns(
+ List<List<SstFileMetadata>> levels, List<SortedRun> runs) {
+ Set<Integer> levelsToClear = new HashSet<>();
+ for (SortedRun run : runs) {
+ for (SstFileMetadata file : run.files) {
+ levelsToClear.add(file.getLevel());
+ }
+ }
+ for (int level : levelsToClear) {
+ if (level >= 0 && level < levels.size()) {
+ levels.get(level).clear();
+ }
+ }
+ }
+
+ /** Delete old SST files from the merged sorted runs, skipping files that
were preserved. */
+ private void deleteOldFiles(List<SortedRun> oldRuns, Set<File>
skippedFiles) {
+ for (SortedRun run : oldRuns) {
+ for (SstFileMetadata meta : run.files) {
+ if (!skippedFiles.contains(meta.getFile())) {
+ fileDeleter.deleteFile(meta.getFile());
+ }
+ }
+ }
+ }
+
+ //
-------------------------------------------------------------------------
+ // Sorted Run
+ //
-------------------------------------------------------------------------
+
+ /** Result of a merge operation, containing the merged run, skipped files
and output level. */
+ private static final class MergeResult {
+ final SortedRun mergedRun;
+ final Set<File> skippedFiles;
+ final int outputLevel;
+
+ MergeResult(SortedRun mergedRun, Set<File> skippedFiles, int
outputLevel) {
+ this.mergedRun = mergedRun;
+ this.skippedFiles = skippedFiles;
+ this.outputLevel = outputLevel;
+ }
+ }
+
+ /** A sorted run is a list of SST files whose key ranges do not overlap. */
+ static final class SortedRun {
+
+ final List<SstFileMetadata> files;
+
+ SortedRun(List<SstFileMetadata> files) {
+ this.files = files;
+ }
+
+ long totalSize() {
+ long size = 0;
+ for (SstFileMetadata meta : files) {
+ size += meta.getFileSize();
+ }
+ return size;
+ }
+ }
+
+ //
-------------------------------------------------------------------------
+ // Merge helpers
+ //
-------------------------------------------------------------------------
+
+ /**
+ * Wraps an SST file iterator as a merge source, lazily reading blocks and
entries. Each source
+ * has a sequence number to resolve key conflicts (higher sequence = newer
data).
+ */
+ private static final class MergeSource {
+
+ private final SstFileReader.SstFileIterator fileIterator;
+ private final int sequence;
+ private BlockIterator currentBlock;
+ private MergeEntry current;
+
+ MergeSource(SstFileReader.SstFileIterator fileIterator, int sequence)
throws IOException {
+ this.fileIterator = fileIterator;
+ this.sequence = sequence;
+ this.currentBlock = fileIterator.readBatch();
+ }
+
+ boolean advance() throws IOException {
+ while (true) {
+ if (currentBlock != null && currentBlock.hasNext()) {
+ Map.Entry<MemorySlice, MemorySlice> entry =
currentBlock.next();
+ current =
+ new MergeEntry(
+ entry.getKey(),
entry.getValue().copyBytes(), sequence, this);
+ return true;
+ }
+ currentBlock = fileIterator.readBatch();
+ if (currentBlock == null) {
+ current = null;
+ return false;
+ }
+ }
+ }
+
+ MergeEntry currentEntry() {
+ return current;
+ }
+ }
+
+ /** A single key-value entry from a merge source, used as a heap element.
*/
+ private static final class MergeEntry {
+
+ final MemorySlice key;
+ final byte[] value;
+ final int sequence;
+ final MergeSource source;
+
+ MergeEntry(MemorySlice key, byte[] value, int sequence, MergeSource
source) {
+ this.key = key;
+ this.value = value;
+ this.sequence = sequence;
+ this.source = source;
+ }
+ }
+
+ /** Functional interface for supplying new SST file paths. */
+ public interface FileSupplier {
+ File newSstFile();
+ }
+
+ /** Functional interface for deleting SST files, allowing callers to clean
up resources. */
+ public interface FileDeleter {
+ void deleteFile(File file);
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/db/SimpleLsmKvDb.java
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/db/SimpleLsmKvDb.java
new file mode 100644
index 0000000000..96df7eef9a
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/db/SimpleLsmKvDb.java
@@ -0,0 +1,573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lookup.sort.db;
+
+import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.compression.CompressOptions;
+import org.apache.paimon.io.cache.CacheManager;
+import org.apache.paimon.lookup.sort.SortLookupStoreFactory;
+import org.apache.paimon.lookup.sort.SortLookupStoreReader;
+import org.apache.paimon.lookup.sort.SortLookupStoreWriter;
+import org.apache.paimon.memory.MemorySlice;
+import org.apache.paimon.options.MemorySize;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * A simple LSM-Tree based KV database built on top of {@link
SortLookupStoreFactory}.
+ *
+ * <p>Architecture (Universal Compaction, inspired by RocksDB):
+ *
+ * <pre>
+ * ┌──────────────────────────────────────────────┐
+ * │ MemTable (SkipList) │ ← Active writes
+ * ├──────────────────────────────────────────────┤
+ * │ Sorted Runs (newest → oldest): │
+ * │ [Run-0] [Run-1] [Run-2] ... [Run-N] │ ← Each run is a
sorted SST file set
+ * └──────────────────────────────────────────────┘
+ * </pre>
+ *
+ * <p>Compaction is triggered when the number of sorted runs exceeds a
threshold. Runs are selected
+ * for merging based on size ratios between adjacent runs, following RocksDB's
Universal Compaction
+ * strategy.
+ *
+ * <p>Note: No WAL is implemented. Data in the MemTable will be lost on crash.
+ *
+ * <p>This class is <b>not</b> thread-safe. External synchronization is
required if accessed from
+ * multiple threads.
+ */
+public class SimpleLsmKvDb implements Closeable {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(SimpleLsmKvDb.class);
+
+ /** Tombstone marker for deleted keys. */
+ private static final byte[] TOMBSTONE = new byte[0];
+
+ /** Maximum number of levels in the LSM tree. */
+ static final int MAX_LEVELS = 4;
+
+ /**
+ * Estimated per-entry memory overhead in the MemTable's TreeMap, beyond
the raw key/value
+ * bytes. This accounts for:
+ *
+ * <ul>
+ * <li>TreeMap.Entry node: ~64 bytes (object header +
left/right/parent/key/value refs +
+ * color)
+ * <li>MemorySlice wrapper: ~32 bytes (object header + segment ref +
offset + length)
+ * <li>MemorySegment backing the key: ~48 bytes (object header +
heapMemory/offHeapBuffer refs
+ * + address + size)
+ * <li>byte[] value array header: ~16 bytes (object header + length)
+ * </ul>
+ */
+ static final long PER_ENTRY_OVERHEAD = 160;
+
+ private final File dataDirectory;
+ private final SortLookupStoreFactory storeFactory;
+ private final Comparator<MemorySlice> keyComparator;
+ private final long memTableFlushThreshold;
+ private final LsmCompactor compactor;
+
+ /** Active MemTable: key -> value bytes (empty byte[] = tombstone). */
+ private TreeMap<MemorySlice, byte[]> memTable;
+
+ /** Estimated size of the current MemTable in bytes. */
+ private long memTableSize;
+
+ /**
+ * Multi-level SST file storage. Each level contains a list of {@link
SstFileMetadata} ordered
+ * by key range. Level 0 files are ordered newest-first (key ranges may
overlap). Level 1+ files
+ * are ordered by minKey (key ranges do NOT overlap).
+ */
+ private final List<List<SstFileMetadata>> levels;
+
+ /** Cached readers for SST files, keyed by file path. Lazily populated on
first lookup. */
+ private final Map<File, SortLookupStoreReader> readerCache;
+
+ private long fileSequence;
+ private boolean closed;
+
+ private SimpleLsmKvDb(
+ File dataDirectory,
+ SortLookupStoreFactory storeFactory,
+ Comparator<MemorySlice> keyComparator,
+ long memTableFlushThreshold,
+ long maxSstFileSize,
+ int level0FileNumCompactTrigger,
+ int sizeRatio) {
+ this.dataDirectory = dataDirectory;
+ this.storeFactory = storeFactory;
+ this.keyComparator = keyComparator;
+ this.memTableFlushThreshold = memTableFlushThreshold;
+ this.memTable = new TreeMap<>(keyComparator);
+ this.memTableSize = 0;
+ this.levels = new ArrayList<>();
+ for (int i = 0; i < MAX_LEVELS; i++) {
+ this.levels.add(new ArrayList<>());
+ }
+ this.readerCache = new HashMap<>();
+ this.fileSequence = 0;
+ this.closed = false;
+ this.compactor =
+ new LsmCompactor(
+ keyComparator,
+ storeFactory,
+ maxSstFileSize,
+ level0FileNumCompactTrigger,
+ sizeRatio,
+ this::closeAndDeleteSstFile);
+ }
+
+ /**
+ * Close the cached reader for the given SST file (if any) and delete the
file from disk. This
+ * is invoked by {@link LsmCompactor} via the {@link
LsmCompactor.FileDeleter} callback during
+ * compaction.
+ */
+ private void closeAndDeleteSstFile(File file) {
+ SortLookupStoreReader reader = readerCache.remove(file);
+ if (reader != null) {
+ try {
+ reader.close();
+ } catch (IOException e) {
+ LOG.warn("Failed to close reader for SST file: {}",
file.getName(), e);
+ }
+ }
+ if (file.exists()) {
+ boolean deleted = file.delete();
+ if (!deleted) {
+ LOG.warn("Failed to delete SST file: {}", file.getName());
+ }
+ }
+ }
+
+ //
-------------------------------------------------------------------------
+ // Builder
+ //
-------------------------------------------------------------------------
+
+ /** Create a builder for {@link SimpleLsmKvDb}. */
+ public static Builder builder(File dataDirectory) {
+ return new Builder(dataDirectory);
+ }
+
+ //
-------------------------------------------------------------------------
+ // Write Operations
+ //
-------------------------------------------------------------------------
+
+ /**
+ * Put a key-value pair into the database.
+ *
+ * @param key the key bytes, must not be null
+ * @param value the value bytes, must not be null
+ */
+ public void put(byte[] key, byte[] value) throws IOException {
+ ensureOpen();
+ if (value.length == 0) {
+ throw new IllegalArgumentException(
+ "Value must not be an empty byte array, which is reserved
as TOMBSTONE marker. "
+ + "Use delete() to remove a key.");
+ }
+ MemorySlice wrappedKey = MemorySlice.wrap(key);
+ byte[] oldValue = memTable.put(wrappedKey, value);
+ long delta = key.length + value.length;
+ if (oldValue != null) {
+ delta -= (key.length + oldValue.length);
+ } else {
+ delta += PER_ENTRY_OVERHEAD;
+ }
+ memTableSize += delta;
+ maybeFlushMemTable();
+ }
+
+ /**
+ * Delete a key from the database by writing a tombstone.
+ *
+ * @param key the key bytes to delete
+ */
+ public void delete(byte[] key) throws IOException {
+ ensureOpen();
+ MemorySlice wrappedKey = MemorySlice.wrap(key);
+ byte[] oldValue = memTable.put(wrappedKey, TOMBSTONE);
+ long delta = key.length;
+ if (oldValue != null) {
+ delta -= (key.length + oldValue.length);
+ } else {
+ delta += PER_ENTRY_OVERHEAD;
+ }
+ memTableSize += delta;
+ maybeFlushMemTable();
+ }
+
+ //
-------------------------------------------------------------------------
+ // Read Operations
+ //
-------------------------------------------------------------------------
+
+ /**
+ * Get the value associated with the given key.
+ *
+ * <p>Search order: MemTable → Level 0 (newest to oldest) → Level 1 →
Level 2 → ...
+ *
+ * @param key the key bytes
+ * @return the value bytes, or null if the key does not exist or has been
deleted
+ */
+ @Nullable
+ public byte[] get(byte[] key) throws IOException {
+ ensureOpen();
+
+ // 1. Search MemTable first (newest data)
+ MemorySlice wrappedKey = MemorySlice.wrap(key);
+ byte[] memValue = memTable.get(wrappedKey);
+ if (memValue != null) {
+ return isTombstone(memValue) ? null : memValue;
+ }
+
+ // 2. Search each level from L0 to Lmax
+ for (int level = 0; level < MAX_LEVELS; level++) {
+ List<SstFileMetadata> levelFiles = levels.get(level);
+ if (levelFiles.isEmpty()) {
+ continue;
+ }
+
+ if (level == 0) {
+ // L0: files may have overlapping keys, search newest-first
+ for (SstFileMetadata meta : levelFiles) {
+ if (!meta.mightContainKey(wrappedKey, keyComparator)) {
+ continue;
+ }
+ byte[] value = lookupInFile(meta.getFile(), key);
+ if (value != null) {
+ return isTombstone(value) ? null : value;
+ }
+ }
+ } else {
+ // L1+: files have non-overlapping key ranges, binary search
+ SstFileMetadata target = findFileForKey(levelFiles,
wrappedKey);
+ if (target != null) {
+ byte[] value = lookupInFile(target.getFile(), key);
+ if (value != null) {
+ return isTombstone(value) ? null : value;
+ }
+ }
+ }
+ }
+
+ return null;
+ }
+
+ //
-------------------------------------------------------------------------
+ // Flush & Compaction
+ //
-------------------------------------------------------------------------
+
+ /** Force flush the current MemTable to a Level 0 SST file. */
+ public void flush() throws IOException {
+ ensureOpen();
+ if (memTable.isEmpty()) {
+ return;
+ }
+
+ TreeMap<MemorySlice, byte[]> snapshot = memTable;
+ memTable = new TreeMap<>(keyComparator);
+ memTableSize = 0;
+
+ SstFileMetadata metadata = writeMemTableToSst(snapshot);
+
+ levels.get(0).add(0, metadata);
+
+ LOG.info(
+ "Flushed MemTable to L0 SST file: {}, entries: {}",
+ metadata.getFile().getName(),
+ snapshot.size());
+
+ compactor.maybeCompact(levels, MAX_LEVELS, this::newSstFile);
+ }
+
+ /**
+ * Force a full compaction of all levels into the deepest level. This
merges all data and cleans
+ * up tombstones (which are only removed at the max level), reducing the
total number of SST
+ * files to the minimum.
+ */
+ public void compact() throws IOException {
+ ensureOpen();
+ compactor.fullCompact(levels, MAX_LEVELS, this::newSstFile);
+ }
+
+ //
-------------------------------------------------------------------------
+ // Lifecycle
+ //
-------------------------------------------------------------------------
+
+ @Override
+ public void close() throws IOException {
+ if (closed) {
+ return;
+ }
+ closed = true;
+
+ // Flush remaining MemTable data to L0
+ if (!memTable.isEmpty()) {
+ TreeMap<MemorySlice, byte[]> snapshot = memTable;
+ memTable = new TreeMap<>(keyComparator);
+ memTableSize = 0;
+
+ SstFileMetadata metadata = writeMemTableToSst(snapshot);
+ levels.get(0).add(0, metadata);
+ }
+
+ // Close all cached readers
+ for (SortLookupStoreReader reader : readerCache.values()) {
+ try {
+ reader.close();
+ } catch (IOException e) {
+ LOG.warn("Failed to close cached reader during shutdown", e);
+ }
+ }
+ readerCache.clear();
+
+ LOG.info("SimpleLsmKvDb closed. Level stats: {}", getLevelStats());
+ }
+
+ /** Return the total number of SST files across all levels. */
+ @VisibleForTesting
+ int getSstFileCount() {
+ int count = 0;
+ for (List<SstFileMetadata> levelFiles : levels) {
+ count += levelFiles.size();
+ }
+ return count;
+ }
+
+ /** Return the number of SST files at a specific level. */
+ public int getLevelFileCount(int level) {
+ if (level < 0 || level >= MAX_LEVELS) {
+ return 0;
+ }
+ return levels.get(level).size();
+ }
+
+ /** Return the estimated MemTable size in bytes. */
+ public long getMemTableSize() {
+ return memTableSize;
+ }
+
+ /** Return a human-readable summary of file counts per level. */
+ public String getLevelStats() {
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < MAX_LEVELS; i++) {
+ int count = levels.get(i).size();
+ if (count > 0) {
+ if (sb.length() > 0) {
+ sb.append(", ");
+ }
+ sb.append("L").append(i).append("=").append(count);
+ }
+ }
+ return sb.length() == 0 ? "empty" : sb.toString();
+ }
+
+ //
-------------------------------------------------------------------------
+ // Internal Helpers
+ //
-------------------------------------------------------------------------
+
+ private void maybeFlushMemTable() throws IOException {
+ if (memTableSize >= memTableFlushThreshold) {
+ flush();
+ }
+ }
+
+ @Nullable
+ private byte[] lookupInFile(File file, byte[] key) throws IOException {
+ SortLookupStoreReader reader = readerCache.get(file);
+ if (reader == null) {
+ reader = storeFactory.createReader(file);
+ readerCache.put(file, reader);
+ }
+ return reader.lookup(key);
+ }
+
+ @Nullable
+ private SstFileMetadata findFileForKey(List<SstFileMetadata> sortedFiles,
MemorySlice key) {
+ int low = 0;
+ int high = sortedFiles.size() - 1;
+ while (low <= high) {
+ int mid = low + (high - low) / 2;
+ SstFileMetadata midFile = sortedFiles.get(mid);
+ if (keyComparator.compare(key, midFile.getMinKey()) < 0) {
+ high = mid - 1;
+ } else if (keyComparator.compare(key, midFile.getMaxKey()) > 0) {
+ low = mid + 1;
+ } else {
+ return midFile;
+ }
+ }
+ return null;
+ }
+
+ private SstFileMetadata writeMemTableToSst(TreeMap<MemorySlice, byte[]>
data)
+ throws IOException {
+ File sstFile = newSstFile();
+ SortLookupStoreWriter writer = storeFactory.createWriter(sstFile,
null);
+ MemorySlice minKey = null;
+ MemorySlice maxKey = null;
+ long tombstoneCount = 0;
+ try {
+ for (Map.Entry<MemorySlice, byte[]> entry : data.entrySet()) {
+ writer.put(entry.getKey().copyBytes(), entry.getValue());
+ if (minKey == null) {
+ minKey = entry.getKey();
+ }
+ maxKey = entry.getKey();
+ if (isTombstone(entry.getValue())) {
+ tombstoneCount++;
+ }
+ }
+ } finally {
+ writer.close();
+ }
+ return new SstFileMetadata(sstFile, minKey, maxKey, tombstoneCount, 0);
+ }
+
+ private File newSstFile() {
+ long sequence = fileSequence++;
+ return new File(dataDirectory, String.format("sst-%06d.db", sequence));
+ }
+
+ private void ensureOpen() {
+ if (closed) {
+ throw new IllegalStateException("SimpleLsmKvDb is already closed");
+ }
+ }
+
+ //
-------------------------------------------------------------------------
+ // Builder
+ //
-------------------------------------------------------------------------
+
+ /** Builder for {@link SimpleLsmKvDb}. */
+ public static class Builder {
+
+ private final File dataDirectory;
+ private long memTableFlushThreshold = 64 * 1024 * 1024; // 64 MB
+ private long maxSstFileSize = 8 * 1024 * 1024; // 8 MB
+ private int blockSize = 32 * 1024; // 32 KB
+ private long cacheSize = 128 * 1024 * 1024; // 128 MB
+ private int level0FileNumCompactTrigger = 4;
+ private int sizeRatio = 10;
+ private CompressOptions compressOptions =
CompressOptions.defaultOptions();
+ private Comparator<MemorySlice> keyComparator = MemorySlice::compareTo;
+
+ Builder(File dataDirectory) {
+ this.dataDirectory = dataDirectory;
+ }
+
+ /** Set the MemTable flush threshold in bytes. Default is 64 MB. */
+ public Builder memTableFlushThreshold(long thresholdBytes) {
+ this.memTableFlushThreshold = thresholdBytes;
+ return this;
+ }
+
+ /** Set the maximum SST file size produced by compaction in bytes.
Default is 8 MB. */
+ public Builder maxSstFileSize(long maxSstFileSize) {
+ this.maxSstFileSize = maxSstFileSize;
+ return this;
+ }
+
+ /** Set the SST block size in bytes. Default is 32 KB. */
+ public Builder blockSize(int blockSize) {
+ this.blockSize = blockSize;
+ return this;
+ }
+
+ /** Set the block cache size in bytes. Default is 128 MB. */
+ public Builder cacheSize(long cacheSize) {
+ this.cacheSize = cacheSize;
+ return this;
+ }
+
+ /** Set the level 0 file number that triggers compaction. Default is
4. */
+ public Builder level0FileNumCompactTrigger(int fileNum) {
+ this.level0FileNumCompactTrigger = fileNum;
+ return this;
+ }
+
+ /**
+ * Set the size ratio percentage for Universal Compaction. When the
accumulated size of
+ * newer runs divided by the next run's size is less than this
percentage, the runs are
+ * merged together. Default is 10 (meaning 10%).
+ */
+ public Builder sizeRatio(int sizeRatio) {
+ this.sizeRatio = sizeRatio;
+ return this;
+ }
+
+ /** Set compression options. Default is zstd level 1. */
+ public Builder compressOptions(CompressOptions compressOptions) {
+ this.compressOptions = compressOptions;
+ return this;
+ }
+
+ /**
+ * Set a custom key comparator. Default is unsigned lexicographic byte
comparison.
+ *
+ * <p>The comparator must be consistent with the {@link
SortLookupStoreFactory}'s comparator
+ * so that SST file lookups return correct results.
+ */
+ public Builder keyComparator(Comparator<MemorySlice> keyComparator) {
+ this.keyComparator = keyComparator;
+ return this;
+ }
+
+ /** Build the {@link SimpleLsmKvDb} instance. */
+ public SimpleLsmKvDb build() {
+ if (!dataDirectory.exists()) {
+ boolean created = dataDirectory.mkdirs();
+ if (!created) {
+ throw new IllegalStateException(
+ "Failed to create data directory: " +
dataDirectory);
+ }
+ }
+
+ CacheManager cacheManager = new
CacheManager(MemorySize.ofBytes(cacheSize));
+ SortLookupStoreFactory factory =
+ new SortLookupStoreFactory(
+ keyComparator, cacheManager, blockSize,
compressOptions);
+
+ return new SimpleLsmKvDb(
+ dataDirectory,
+ factory,
+ keyComparator,
+ memTableFlushThreshold,
+ maxSstFileSize,
+ level0FileNumCompactTrigger,
+ sizeRatio);
+ }
+ }
+
+ static boolean isTombstone(byte[] value) {
+ return value.length == 0;
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/db/SstFileMetadata.java
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/db/SstFileMetadata.java
new file mode 100644
index 0000000000..48e78d426c
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/db/SstFileMetadata.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lookup.sort.db;
+
+import org.apache.paimon.memory.MemorySlice;
+
+import java.io.File;
+import java.util.Comparator;
+
+/**
+ * Metadata for an SST file, tracking its key range, size and level for
efficient compaction and
+ * lookup.
+ */
+public final class SstFileMetadata {
+
+ private final File file;
+ private final MemorySlice minKey;
+ private final MemorySlice maxKey;
+ private final long fileSize;
+ private final long tombstoneCount;
+ private final int level;
+
+ public SstFileMetadata(
+ File file, MemorySlice minKey, MemorySlice maxKey, long
tombstoneCount, int level) {
+ this(file, minKey, maxKey, file.length(), tombstoneCount, level);
+ }
+
+ public SstFileMetadata(
+ File file,
+ MemorySlice minKey,
+ MemorySlice maxKey,
+ long fileSize,
+ long tombstoneCount,
+ int level) {
+ this.file = file;
+ this.minKey = minKey;
+ this.maxKey = maxKey;
+ this.fileSize = fileSize;
+ this.tombstoneCount = tombstoneCount;
+ this.level = level;
+ }
+
+ public File getFile() {
+ return file;
+ }
+
+ public MemorySlice getMinKey() {
+ return minKey;
+ }
+
+ public MemorySlice getMaxKey() {
+ return maxKey;
+ }
+
+ public long getFileSize() {
+ return fileSize;
+ }
+
+ public int getLevel() {
+ return level;
+ }
+
+ public SstFileMetadata withLevel(int newLevel) {
+ return new SstFileMetadata(file, minKey, maxKey, fileSize,
tombstoneCount, newLevel);
+ }
+
+ public boolean hasTombstones() {
+ return tombstoneCount > 0;
+ }
+
+ public boolean mightContainKey(MemorySlice key, Comparator<MemorySlice>
comparator) {
+ return comparator.compare(key, minKey) >= 0 && comparator.compare(key,
maxKey) <= 0;
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/sst/SstFileWriter.java
b/paimon-common/src/main/java/org/apache/paimon/sst/SstFileWriter.java
index 613ffd80de..9fecb05ad1 100644
--- a/paimon-common/src/main/java/org/apache/paimon/sst/SstFileWriter.java
+++ b/paimon-common/src/main/java/org/apache/paimon/sst/SstFileWriter.java
@@ -182,9 +182,11 @@ public class SstFileWriter {
@Nullable
public BlockHandle writeIndexBlock() throws IOException {
BlockHandle indexBlock = writeBlock(indexBlockWriter);
- LOG.info("Number of record: {}", recordCount);
- LOG.info("totalUncompressedSize: {}",
MemorySize.ofBytes(totalUncompressedSize));
- LOG.info("totalCompressedSize: {}",
MemorySize.ofBytes(totalCompressedSize));
+ LOG.info(
+ "SST written: records={}, uncompressed={}, compressed={}",
+ recordCount,
+ MemorySize.ofBytes(totalUncompressedSize),
+ MemorySize.ofBytes(totalCompressedSize));
return indexBlock;
}
diff --git
a/paimon-common/src/test/java/org/apache/paimon/lookup/sort/db/SimpleLsmKvDbTest.java
b/paimon-common/src/test/java/org/apache/paimon/lookup/sort/db/SimpleLsmKvDbTest.java
new file mode 100644
index 0000000000..e3b67fd3cd
--- /dev/null
+++
b/paimon-common/src/test/java/org/apache/paimon/lookup/sort/db/SimpleLsmKvDbTest.java
@@ -0,0 +1,1363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lookup.sort.db;
+
+import org.apache.paimon.compression.CompressOptions;
+import org.apache.paimon.memory.MemorySlice;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Comparator;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+/** Test for {@link SimpleLsmKvDb}. */
+public class SimpleLsmKvDbTest {
+
+ @TempDir java.nio.file.Path tempDir;
+
+ private File dataDirectory;
+
+ @BeforeEach
+ public void setUp() {
+ dataDirectory = new File(tempDir.toFile(), "test-db");
+ }
+
+ private SimpleLsmKvDb createDb() {
+ return SimpleLsmKvDb.builder(dataDirectory)
+ .memTableFlushThreshold(1024)
+ .blockSize(256)
+ .cacheSize(4 * 1024 * 1024)
+ .level0FileNumCompactTrigger(4)
+ .compressOptions(new CompressOptions("none", 1))
+ .build();
+ }
+
+ @Test
+ public void testBasicPutAndGet() throws IOException {
+ try (SimpleLsmKvDb db = createDb()) {
+ putString(db, "key1", "value1");
+ putString(db, "key2", "value2");
+ putString(db, "key3", "value3");
+
+ Assertions.assertEquals("value1", getString(db, "key1"));
+ Assertions.assertEquals("value2", getString(db, "key2"));
+ Assertions.assertEquals("value3", getString(db, "key3"));
+ Assertions.assertNull(getString(db, "nonexistent"));
+ }
+ }
+
+ @Test
+ public void testOverwriteValue() throws IOException {
+ try (SimpleLsmKvDb db = createDb()) {
+ putString(db, "key1", "value1");
+ Assertions.assertEquals("value1", getString(db, "key1"));
+
+ putString(db, "key1", "value2");
+ Assertions.assertEquals("value2", getString(db, "key1"));
+
+ putString(db, "key1", "value3");
+ Assertions.assertEquals("value3", getString(db, "key1"));
+ }
+ }
+
+ @Test
+ public void testDelete() throws IOException {
+ try (SimpleLsmKvDb db = createDb()) {
+ putString(db, "key1", "value1");
+ putString(db, "key2", "value2");
+
+ Assertions.assertEquals("value1", getString(db, "key1"));
+
+ deleteString(db, "key1");
+ Assertions.assertNull(getString(db, "key1"));
+
+ // key2 should still exist
+ Assertions.assertEquals("value2", getString(db, "key2"));
+
+ // deleting a non-existent key should not cause errors
+ deleteString(db, "nonexistent");
+ Assertions.assertNull(getString(db, "nonexistent"));
+ }
+ }
+
+ @Test
+ public void testDeleteAndReinsert() throws IOException {
+ try (SimpleLsmKvDb db = createDb()) {
+ putString(db, "key1", "value1");
+ Assertions.assertEquals("value1", getString(db, "key1"));
+
+ deleteString(db, "key1");
+ Assertions.assertNull(getString(db, "key1"));
+
+ putString(db, "key1", "value2");
+ Assertions.assertEquals("value2", getString(db, "key1"));
+ }
+ }
+
+ @Test
+ public void testFlushToSst() throws IOException {
+ try (SimpleLsmKvDb db = createDb()) {
+ Assertions.assertEquals(0, db.getSstFileCount());
+
+ // Write enough data to trigger a flush (threshold is 1024 bytes)
+ for (int i = 0; i < 100; i++) {
+ putString(db, String.format("key-%05d", i),
String.format("value-%05d", i));
+ }
+
+ // After writing enough data, at least one SST file should exist
+ Assertions.assertTrue(
+ db.getSstFileCount() > 0,
+ "Expected at least one SST file after writing enough
data");
+
+ // All data should still be readable
+ for (int i = 0; i < 100; i++) {
+ String expected = String.format("value-%05d", i);
+ String actual = getString(db, String.format("key-%05d", i));
+ Assertions.assertEquals(expected, actual, "Mismatch for key-"
+ i);
+ }
+ }
+ }
+
+ @Test
+ public void testReadFromSstAfterFlush() throws IOException {
+ try (SimpleLsmKvDb db = createDb()) {
+ putString(db, "alpha", "first");
+ putString(db, "beta", "second");
+ putString(db, "gamma", "third");
+
+ // Force flush
+ db.flush();
+ Assertions.assertTrue(db.getSstFileCount() > 0);
+
+ // Data should be readable from SST
+ Assertions.assertEquals("first", getString(db, "alpha"));
+ Assertions.assertEquals("second", getString(db, "beta"));
+ Assertions.assertEquals("third", getString(db, "gamma"));
+ }
+ }
+
+ @Test
+ public void testOverwriteAcrossFlush() throws IOException {
+ try (SimpleLsmKvDb db = createDb()) {
+ putString(db, "key1", "old-value");
+ db.flush();
+
+ // Overwrite in MemTable (newer than SST)
+ putString(db, "key1", "new-value");
+ Assertions.assertEquals("new-value", getString(db, "key1"));
+
+ // After another flush, the new value should persist
+ db.flush();
+ Assertions.assertEquals("new-value", getString(db, "key1"));
+ }
+ }
+
+ @Test
+ public void testDeleteAcrossFlush() throws IOException {
+ try (SimpleLsmKvDb db = createDb()) {
+ putString(db, "key1", "value1");
+ db.flush();
+
+ // Delete in MemTable
+ deleteString(db, "key1");
+ Assertions.assertNull(getString(db, "key1"));
+
+ // After flush, the tombstone should be in SST and key should
still be deleted
+ db.flush();
+ Assertions.assertNull(getString(db, "key1"));
+ }
+ }
+
+ @Test
+ public void testCompaction() throws IOException {
+ // Use a low compaction threshold to trigger compaction easily
+ SimpleLsmKvDb db =
+ SimpleLsmKvDb.builder(dataDirectory)
+ .memTableFlushThreshold(256)
+ .blockSize(128)
+ .cacheSize(4 * 1024 * 1024)
+ .level0FileNumCompactTrigger(3)
+ .compressOptions(new CompressOptions("none", 1))
+ .build();
+
+ try {
+ // Write data in batches to create multiple SST files
+ for (int batch = 0; batch < 5; batch++) {
+ for (int i = 0; i < 20; i++) {
+ int key = batch * 20 + i;
+ putString(
+ db,
+ String.format("key-%05d", key),
+ String.format("value-%05d-batch-%d", key, batch));
+ }
+ db.flush();
+ }
+
+ // Compaction may have been triggered automatically
+ // Verify all data is still accessible
+ for (int batch = 0; batch < 5; batch++) {
+ for (int i = 0; i < 20; i++) {
+ int key = batch * 20 + i;
+ String value = getString(db, String.format("key-%05d",
key));
+ Assertions.assertNotNull(
+ value, "Key key-" + key + " should exist after
compaction");
+ }
+ }
+ } finally {
+ db.close();
+ }
+ }
+
+ @Test
+ public void testCompactionRemovesTombstones() throws IOException {
+ try (SimpleLsmKvDb db = createDb()) {
+ putString(db, "key1", "value1");
+ putString(db, "key2", "value2");
+ putString(db, "key3", "value3");
+ db.flush();
+
+ deleteString(db, "key2");
+ db.flush();
+
+ // Before compaction, key2 should be deleted
+ Assertions.assertNull(getString(db, "key2"));
+
+ // Force compaction to push tombstones to max level where they are
removed
+ db.compact();
+
+ // After full compaction, key2 should still be deleted
+ Assertions.assertNull(getString(db, "key2"));
+ Assertions.assertEquals("value1", getString(db, "key1"));
+ Assertions.assertEquals("value3", getString(db, "key3"));
+ }
+ }
+
+ @Test
+ public void testManualCompaction() throws IOException {
+ try (SimpleLsmKvDb db = createDb()) {
+ // Create multiple SST files in L0
+ putString(db, "a", "1");
+ db.flush();
+ putString(db, "b", "2");
+ db.flush();
+ putString(db, "c", "3");
+ db.flush();
+
+ Assertions.assertEquals(3, db.getSstFileCount());
+ Assertions.assertEquals(3, db.getLevelFileCount(0));
+
+ db.compact();
+
+ // After full compaction, all data should be in the deepest level
+ Assertions.assertEquals(0, db.getLevelFileCount(0));
+ Assertions.assertTrue(db.getSstFileCount() > 0);
+
+ // All data should still be accessible
+ Assertions.assertEquals("1", getString(db, "a"));
+ Assertions.assertEquals("2", getString(db, "b"));
+ Assertions.assertEquals("3", getString(db, "c"));
+ }
+ }
+
+ @Test
+ public void testLargeDataSet() throws IOException {
+ try (SimpleLsmKvDb db = createDb()) {
+ int recordCount = 1000;
+ for (int i = 0; i < recordCount; i++) {
+ putString(db, String.format("key-%08d", i),
String.format("value-%08d", i));
+ }
+
+ // Verify all records
+ for (int i = 0; i < recordCount; i++) {
+ String expected = String.format("value-%08d", i);
+ String actual = getString(db, String.format("key-%08d", i));
+ Assertions.assertEquals(expected, actual, "Mismatch at index "
+ i);
+ }
+ }
+ }
+
+ @Test
+ public void testByteArrayKeyValue() throws IOException {
+ try (SimpleLsmKvDb db = createDb()) {
+ byte[] key = new byte[] {0x01, 0x02, 0x03};
+ byte[] value = new byte[] {0x0A, 0x0B, 0x0C, 0x0D};
+
+ db.put(key, value);
+
+ byte[] result = db.get(key);
+ Assertions.assertNotNull(result);
+ Assertions.assertArrayEquals(value, result);
+ }
+ }
+
+ @Test
+ public void testCloseFlushesMemTable() throws IOException {
+ File dbDir = new File(tempDir.toFile(), "close-test-db");
+ // Write data and close
+ SimpleLsmKvDb db =
+ SimpleLsmKvDb.builder(dbDir)
+ .memTableFlushThreshold(1024 * 1024) // large
threshold, won't auto-flush
+ .blockSize(256)
+ .cacheSize(4 * 1024 * 1024)
+ .level0FileNumCompactTrigger(10)
+ .compressOptions(new CompressOptions("none", 1))
+ .build();
+
+ putString(db, "persist-key", "persist-value");
+ Assertions.assertEquals(0, db.getSstFileCount());
+ db.close();
+
+ // After close, data should have been flushed to SST
+ Assertions.assertEquals(1, db.getSstFileCount());
+ }
+
+ @Test
+ public void testClosedDbThrowsException() throws IOException {
+ SimpleLsmKvDb db = createDb();
+ db.close();
+
+ Assertions.assertThrows(IllegalStateException.class, () ->
putString(db, "key", "value"));
+ Assertions.assertThrows(IllegalStateException.class, () ->
getString(db, "key"));
+ Assertions.assertThrows(IllegalStateException.class, () ->
deleteString(db, "key"));
+ }
+
+ @Test
+ public void testWithCompression() throws IOException {
+ SimpleLsmKvDb db =
+ SimpleLsmKvDb.builder(new File(tempDir.toFile(),
"compressed-db"))
+ .memTableFlushThreshold(512)
+ .blockSize(256)
+ .cacheSize(4 * 1024 * 1024)
+ .level0FileNumCompactTrigger(4)
+ .compressOptions(CompressOptions.defaultOptions())
+ .build();
+
+ try {
+ for (int i = 0; i < 200; i++) {
+ putString(
+ db,
+ String.format("compressed-key-%05d", i),
+ String.format("compressed-value-%05d", i));
+ }
+
+ for (int i = 0; i < 200; i++) {
+ String expected = String.format("compressed-value-%05d", i);
+ String actual = getString(db,
String.format("compressed-key-%05d", i));
+ Assertions.assertEquals(expected, actual);
+ }
+ } finally {
+ db.close();
+ }
+ }
+
+ @Test
+ public void testEmptyDb() throws IOException {
+ try (SimpleLsmKvDb db = createDb()) {
+ Assertions.assertNull(getString(db, "any-key"));
+ Assertions.assertEquals(0, db.getSstFileCount());
+ Assertions.assertEquals(0, db.getMemTableSize());
+ }
+ }
+
+ @Test
+ public void testFlushEmptyMemTable() throws IOException {
+ try (SimpleLsmKvDb db = createDb()) {
+ // Flushing an empty MemTable should be a no-op
+ db.flush();
+ Assertions.assertEquals(0, db.getSstFileCount());
+ }
+ }
+
+ @Test
+ public void testUniversalCompactionTriggeredByRunCount() throws
IOException {
+ // Compaction threshold = 3, so 3 sorted runs trigger compaction
+ SimpleLsmKvDb db =
+ SimpleLsmKvDb.builder(new File(tempDir.toFile(),
"universal-trigger-db"))
+ .memTableFlushThreshold(1024 * 1024)
+ .blockSize(256)
+ .cacheSize(4 * 1024 * 1024)
+ .level0FileNumCompactTrigger(3)
+ .compressOptions(new CompressOptions("none", 1))
+ .build();
+
+ try {
+ // Create 3 sorted runs (L0 files) to trigger universal compaction
+ putString(db, "aaa", "v1");
+ db.flush();
+ Assertions.assertEquals(1, db.getLevelFileCount(0));
+
+ putString(db, "bbb", "v2");
+ db.flush();
+ Assertions.assertEquals(2, db.getLevelFileCount(0));
+
+ putString(db, "ccc", "v3");
+ db.flush();
+ // After 3rd flush, universal compaction should have been triggered
+ // L0 should be cleared and data moved to a deeper level
+ Assertions.assertEquals(0, db.getLevelFileCount(0));
+ Assertions.assertTrue(db.getSstFileCount() > 0);
+
+ // All data should be accessible
+ Assertions.assertEquals("v1", getString(db, "aaa"));
+ Assertions.assertEquals("v2", getString(db, "bbb"));
+ Assertions.assertEquals("v3", getString(db, "ccc"));
+ } finally {
+ db.close();
+ }
+ }
+
+ @Test
+ public void testUniversalCompactionWithOverlappingKeys() throws
IOException {
+ SimpleLsmKvDb db =
+ SimpleLsmKvDb.builder(new File(tempDir.toFile(),
"universal-overlap-db"))
+ .memTableFlushThreshold(1024 * 1024)
+ .blockSize(256)
+ .cacheSize(4 * 1024 * 1024)
+ .level0FileNumCompactTrigger(3)
+ .compressOptions(new CompressOptions("none", 1))
+ .build();
+
+ try {
+ // Write overlapping keys across multiple flushes
+ putString(db, "key-a", "old-a");
+ putString(db, "key-b", "old-b");
+ db.flush();
+
+ putString(db, "key-a", "new-a");
+ putString(db, "key-c", "new-c");
+ db.flush();
+
+ putString(db, "key-b", "new-b");
+ putString(db, "key-d", "new-d");
+ db.flush();
+ // Universal compaction should have been triggered
+
+ // Newer values should win
+ Assertions.assertEquals("new-a", getString(db, "key-a"));
+ Assertions.assertEquals("new-b", getString(db, "key-b"));
+ Assertions.assertEquals("new-c", getString(db, "key-c"));
+ Assertions.assertEquals("new-d", getString(db, "key-d"));
+ } finally {
+ db.close();
+ }
+ }
+
+ @Test
+ public void testUniversalCompactionReducesFileCount() throws IOException {
+ SimpleLsmKvDb db =
+ SimpleLsmKvDb.builder(new File(tempDir.toFile(),
"universal-reduce-db"))
+ .memTableFlushThreshold(1024 * 1024)
+ .blockSize(256)
+ .cacheSize(4 * 1024 * 1024)
+ .level0FileNumCompactTrigger(3)
+ .compressOptions(new CompressOptions("none", 1))
+ .build();
+
+ try {
+ // Create 2 runs with overlapping keys (below threshold, no
compaction)
+ putString(db, "shared-key", "v1");
+ putString(db, "a", "1");
+ db.flush();
+ putString(db, "shared-key", "v2");
+ putString(db, "b", "2");
+ db.flush();
+ int fileCountBeforeCompaction = db.getSstFileCount();
+ Assertions.assertEquals(2, fileCountBeforeCompaction);
+
+ // 3rd flush triggers compaction; overlapping files get merged,
reducing count
+ putString(db, "shared-key", "v3");
+ putString(db, "c", "3");
+ db.flush();
+ int fileCountAfterCompaction = db.getSstFileCount();
+ Assertions.assertTrue(
+ fileCountAfterCompaction <= fileCountBeforeCompaction,
+ "Compaction should reduce or maintain file count, but got "
+ + fileCountAfterCompaction);
+
+ // Data integrity — newest value wins
+ Assertions.assertEquals("v3", getString(db, "shared-key"));
+ Assertions.assertEquals("1", getString(db, "a"));
+ Assertions.assertEquals("2", getString(db, "b"));
+ Assertions.assertEquals("3", getString(db, "c"));
+ } finally {
+ db.close();
+ }
+ }
+
+ @Test
+ public void testUniversalCompactionMultipleRounds() throws IOException {
+ // Low threshold to trigger compaction frequently
+ SimpleLsmKvDb db =
+ SimpleLsmKvDb.builder(new File(tempDir.toFile(),
"universal-multi-db"))
+ .memTableFlushThreshold(512)
+ .blockSize(128)
+ .cacheSize(4 * 1024 * 1024)
+ .level0FileNumCompactTrigger(3)
+ .sizeRatio(50)
+ .compressOptions(new CompressOptions("none", 1))
+ .build();
+
+ try {
+ // Write many batches to trigger multiple rounds of compaction
+ int totalKeys = 500;
+ for (int i = 0; i < totalKeys; i++) {
+ putString(db, String.format("key-%06d", i),
String.format("value-%06d", i));
+ }
+
+ // Verify all data is still correct after multiple compaction
rounds
+ for (int i = 0; i < totalKeys; i++) {
+ String expected = String.format("value-%06d", i);
+ String actual = getString(db, String.format("key-%06d", i));
+ Assertions.assertEquals(
+ expected, actual, "Mismatch for key-" +
String.format("%06d", i));
+ }
+ } finally {
+ db.close();
+ }
+ }
+
+ @Test
+ public void testUniversalCompactionPreservesTombstonesInPartialMerge()
throws IOException {
+ // Use a high compaction threshold so we can control when compaction
happens
+ SimpleLsmKvDb db =
+ SimpleLsmKvDb.builder(new File(tempDir.toFile(),
"universal-tombstone-db"))
+ .memTableFlushThreshold(1024 * 1024)
+ .blockSize(256)
+ .cacheSize(4 * 1024 * 1024)
+ .level0FileNumCompactTrigger(4)
+ .compressOptions(new CompressOptions("none", 1))
+ .build();
+
+ try {
+ // Create data and tombstones across multiple flushes
+ putString(db, "key-1", "value-1");
+ putString(db, "key-2", "value-2");
+ putString(db, "key-3", "value-3");
+ db.flush();
+
+ deleteString(db, "key-2");
+ db.flush();
+
+ putString(db, "key-4", "value-4");
+ db.flush();
+
+ // key-2 should be deleted (tombstone exists in SST)
+ Assertions.assertNull(getString(db, "key-2"));
+
+ // Now trigger compaction by adding one more flush
+ putString(db, "key-5", "value-5");
+ db.flush();
+ // 4 sorted runs should trigger compaction
+
+ // After compaction, deleted key should still be gone
+ Assertions.assertNull(getString(db, "key-2"));
+ Assertions.assertEquals("value-1", getString(db, "key-1"));
+ Assertions.assertEquals("value-3", getString(db, "key-3"));
+ Assertions.assertEquals("value-4", getString(db, "key-4"));
+ Assertions.assertEquals("value-5", getString(db, "key-5"));
+
+ // Full compaction should clean up tombstones completely
+ db.compact();
+ Assertions.assertNull(getString(db, "key-2"));
+ Assertions.assertEquals("value-1", getString(db, "key-1"));
+ } finally {
+ db.close();
+ }
+ }
+
+ @Test
+ public void testUniversalCompactionWithUpdatesAcrossRuns() throws
IOException {
+ SimpleLsmKvDb db =
+ SimpleLsmKvDb.builder(new File(tempDir.toFile(),
"universal-update-db"))
+ .memTableFlushThreshold(1024 * 1024)
+ .blockSize(256)
+ .cacheSize(4 * 1024 * 1024)
+ .level0FileNumCompactTrigger(3)
+ .compressOptions(new CompressOptions("none", 1))
+ .build();
+
+ try {
+ // Write same key across multiple sorted runs with different values
+ putString(db, "shared-key", "version-1");
+ db.flush();
+
+ putString(db, "shared-key", "version-2");
+ db.flush();
+
+ putString(db, "shared-key", "version-3");
+ db.flush();
+ // Compaction triggered
+
+ // The newest value should always win
+ Assertions.assertEquals("version-3", getString(db, "shared-key"));
+
+ // Write more updates and compact again
+ putString(db, "shared-key", "version-4");
+ db.flush();
+ putString(db, "shared-key", "version-5");
+ db.flush();
+ putString(db, "shared-key", "version-6");
+ db.flush();
+
+ Assertions.assertEquals("version-6", getString(db, "shared-key"));
+
+ // Full compaction should still preserve the latest value
+ db.compact();
+ Assertions.assertEquals("version-6", getString(db, "shared-key"));
+ } finally {
+ db.close();
+ }
+ }
+
+ @Test
+ public void testLargeScaleFlushCompactAndFullCompact() throws IOException {
+ // Very small thresholds to trigger flush, auto-compaction, and full
compaction frequently
+ SimpleLsmKvDb db =
+ SimpleLsmKvDb.builder(new File(tempDir.toFile(),
"large-scale-db"))
+ .memTableFlushThreshold(256)
+ .blockSize(64)
+ .cacheSize(4 * 1024 * 1024)
+ .level0FileNumCompactTrigger(3)
+ .sizeRatio(20)
+ .compressOptions(new CompressOptions("none", 1))
+ .build();
+
+ try {
+ int totalKeys = 2000;
+
+ // Phase 1: Insert keys, triggering many flushes and
auto-compactions
+ for (int i = 0; i < totalKeys; i++) {
+ putString(db, String.format("key-%08d", i),
String.format("value-%08d", i));
+ }
+
+ // Verify all keys are readable after many auto-compactions
+ for (int i = 0; i < totalKeys; i++) {
+ String expected = String.format("value-%08d", i);
+ String actual = getString(db, String.format("key-%08d", i));
+ Assertions.assertEquals(
+ expected, actual, "Phase 1 mismatch for key-" +
String.format("%08d", i));
+ }
+
+ // Phase 2: Overwrite half the keys
+ for (int i = 0; i < totalKeys / 2; i++) {
+ putString(db, String.format("key-%08d", i),
String.format("updated-%08d", i));
+ }
+
+ // Verify overwrites
+ for (int i = 0; i < totalKeys / 2; i++) {
+ String expected = String.format("updated-%08d", i);
+ String actual = getString(db, String.format("key-%08d", i));
+ Assertions.assertEquals(
+ expected,
+ actual,
+ "Phase 2 overwrite mismatch for key-" +
String.format("%08d", i));
+ }
+ for (int i = totalKeys / 2; i < totalKeys; i++) {
+ String expected = String.format("value-%08d", i);
+ String actual = getString(db, String.format("key-%08d", i));
+ Assertions.assertEquals(
+ expected,
+ actual,
+ "Phase 2 original mismatch for key-" +
String.format("%08d", i));
+ }
+
+ // Phase 3: Delete a quarter of the keys
+ for (int i = 0; i < totalKeys / 4; i++) {
+ deleteString(db, String.format("key-%08d", i));
+ }
+
+ // Verify deletes
+ for (int i = 0; i < totalKeys / 4; i++) {
+ Assertions.assertNull(
+ getString(db, String.format("key-%08d", i)),
+ "Phase 3 deleted key should be null: key-" +
String.format("%08d", i));
+ }
+
+ // Phase 4: Full compaction — merges all runs and cleans tombstones
+ int fileCountBefore = db.getSstFileCount();
+ db.compact();
+ int fileCountAfter = db.getSstFileCount();
+
+ Assertions.assertTrue(
+ fileCountAfter <= fileCountBefore,
+ "Full compaction should reduce file count: before="
+ + fileCountBefore
+ + " after="
+ + fileCountAfter);
+ Assertions.assertEquals(0, db.getLevelFileCount(0));
+
+ // Verify all data integrity after full compaction
+ for (int i = 0; i < totalKeys / 4; i++) {
+ Assertions.assertNull(
+ getString(db, String.format("key-%08d", i)),
+ "After compact, deleted key should be null: key-"
+ + String.format("%08d", i));
+ }
+ for (int i = totalKeys / 4; i < totalKeys / 2; i++) {
+ Assertions.assertEquals(
+ String.format("updated-%08d", i),
+ getString(db, String.format("key-%08d", i)),
+ "After compact, updated key mismatch: key-" +
String.format("%08d", i));
+ }
+ for (int i = totalKeys / 2; i < totalKeys; i++) {
+ Assertions.assertEquals(
+ String.format("value-%08d", i),
+ getString(db, String.format("key-%08d", i)),
+ "After compact, original key mismatch: key-" +
String.format("%08d", i));
+ }
+
+ // Phase 5: Write more data after full compaction to ensure DB is
still functional
+ for (int i = totalKeys; i < totalKeys + 500; i++) {
+ putString(db, String.format("key-%08d", i),
String.format("new-%08d", i));
+ }
+
+ for (int i = totalKeys; i < totalKeys + 500; i++) {
+ Assertions.assertEquals(
+ String.format("new-%08d", i),
+ getString(db, String.format("key-%08d", i)),
+ "Phase 5 new key mismatch: key-" +
String.format("%08d", i));
+ }
+ } finally {
+ db.close();
+ }
+ }
+
+ @Test
+ public void testCompactRemovesTombstonesAndMerges() throws IOException {
+ try (SimpleLsmKvDb db = createDb()) {
+ // Create data across multiple flushes
+ for (int i = 0; i < 50; i++) {
+ putString(db, String.format("key-%05d", i),
String.format("value-%05d", i));
+ }
+ db.flush();
+
+ for (int i = 25; i < 75; i++) {
+ putString(db, String.format("key-%05d", i),
String.format("updated-%05d", i));
+ }
+ db.flush();
+
+ // Delete some keys
+ for (int i = 0; i < 10; i++) {
+ deleteString(db, String.format("key-%05d", i));
+ }
+ db.flush();
+
+ // Full compaction should merge everything and remove tombstones
+ db.compact();
+
+ // Verify results
+ for (int i = 0; i < 10; i++) {
+ Assertions.assertNull(
+ getString(db, String.format("key-%05d", i)),
+ "Deleted key should be null: key-" + i);
+ }
+ for (int i = 10; i < 25; i++) {
+ Assertions.assertEquals(
+ String.format("value-%05d", i),
+ getString(db, String.format("key-%05d", i)));
+ }
+ for (int i = 25; i < 75; i++) {
+ Assertions.assertEquals(
+ String.format("updated-%05d", i),
+ getString(db, String.format("key-%05d", i)));
+ }
+ }
+ }
+
+ @Test
+ public void testLevelStats() throws IOException {
+ SimpleLsmKvDb db =
+ SimpleLsmKvDb.builder(new File(tempDir.toFile(), "stats-db"))
+ .memTableFlushThreshold(1024 * 1024)
+ .blockSize(256)
+ .cacheSize(4 * 1024 * 1024)
+ .level0FileNumCompactTrigger(10)
+ .compressOptions(new CompressOptions("none", 1))
+ .build();
+
+ try {
+ Assertions.assertEquals("empty", db.getLevelStats());
+
+ putString(db, "a", "1");
+ db.flush();
+ Assertions.assertTrue(db.getLevelStats().contains("L0=1"));
+
+ putString(db, "b", "2");
+ db.flush();
+ Assertions.assertTrue(db.getLevelStats().contains("L0=2"));
+ } finally {
+ db.close();
+ }
+ }
+
+ @Test
+ public void testCustomComparatorReverseOrder() throws IOException {
+ // Reverse comparator: keys are ordered in descending byte order
+ Comparator<MemorySlice> reverseComparator =
+ new Comparator<MemorySlice>() {
+ @Override
+ public int compare(MemorySlice a, MemorySlice b) {
+ return b.compareTo(a);
+ }
+ };
+
+ SimpleLsmKvDb db =
+ SimpleLsmKvDb.builder(new File(tempDir.toFile(), "reverse-db"))
+ .memTableFlushThreshold(1024 * 1024)
+ .blockSize(256)
+ .cacheSize(4 * 1024 * 1024)
+ .level0FileNumCompactTrigger(3)
+ .compressOptions(new CompressOptions("none", 1))
+ .keyComparator(reverseComparator)
+ .build();
+
+ try {
+ // Basic put/get with reverse comparator
+ putString(db, "aaa", "value-a");
+ putString(db, "bbb", "value-b");
+ putString(db, "ccc", "value-c");
+
+ Assertions.assertEquals("value-a", getString(db, "aaa"));
+ Assertions.assertEquals("value-b", getString(db, "bbb"));
+ Assertions.assertEquals("value-c", getString(db, "ccc"));
+
+ // Flush to SST and verify reads still work with reverse comparator
+ db.flush();
+ Assertions.assertEquals("value-a", getString(db, "aaa"));
+ Assertions.assertEquals("value-b", getString(db, "bbb"));
+ Assertions.assertEquals("value-c", getString(db, "ccc"));
+ Assertions.assertNull(getString(db, "ddd"));
+ } finally {
+ db.close();
+ }
+ }
+
+ @Test
+ public void testCustomComparatorWithCompaction() throws IOException {
+ // Reverse comparator to ensure compaction uses the custom comparator
+ Comparator<MemorySlice> reverseComparator =
+ new Comparator<MemorySlice>() {
+ @Override
+ public int compare(MemorySlice a, MemorySlice b) {
+ return b.compareTo(a);
+ }
+ };
+
+ SimpleLsmKvDb db =
+ SimpleLsmKvDb.builder(new File(tempDir.toFile(),
"reverse-compact-db"))
+ .memTableFlushThreshold(1024 * 1024)
+ .blockSize(256)
+ .cacheSize(4 * 1024 * 1024)
+ .level0FileNumCompactTrigger(3)
+ .compressOptions(new CompressOptions("none", 1))
+ .keyComparator(reverseComparator)
+ .build();
+
+ try {
+ // Create multiple L0 files with overlapping keys to trigger
compaction
+ putString(db, "key-a", "old-a");
+ putString(db, "key-b", "old-b");
+ db.flush();
+
+ putString(db, "key-a", "new-a");
+ putString(db, "key-c", "new-c");
+ db.flush();
+
+ putString(db, "key-b", "new-b");
+ putString(db, "key-d", "new-d");
+ db.flush();
+ // Compaction should have been triggered (threshold = 3)
+
+ // Newer values should win after compaction with reverse comparator
+ Assertions.assertEquals("new-a", getString(db, "key-a"));
+ Assertions.assertEquals("new-b", getString(db, "key-b"));
+ Assertions.assertEquals("new-c", getString(db, "key-c"));
+ Assertions.assertEquals("new-d", getString(db, "key-d"));
+ } finally {
+ db.close();
+ }
+ }
+
+ @Test
+ public void testCustomComparatorDeleteAcrossFlushAndCompact() throws
IOException {
+ // Reverse comparator
+ Comparator<MemorySlice> reverseComparator =
+ new Comparator<MemorySlice>() {
+ @Override
+ public int compare(MemorySlice a, MemorySlice b) {
+ return b.compareTo(a);
+ }
+ };
+
+ SimpleLsmKvDb db =
+ SimpleLsmKvDb.builder(new File(tempDir.toFile(),
"reverse-delete-db"))
+ .memTableFlushThreshold(1024 * 1024)
+ .blockSize(256)
+ .cacheSize(4 * 1024 * 1024)
+ .level0FileNumCompactTrigger(4)
+ .compressOptions(new CompressOptions("none", 1))
+ .keyComparator(reverseComparator)
+ .build();
+
+ try {
+ putString(db, "key-x", "value-x");
+ putString(db, "key-y", "value-y");
+ putString(db, "key-z", "value-z");
+ db.flush();
+
+ // Delete key-y via tombstone
+ deleteString(db, "key-y");
+ db.flush();
+
+ Assertions.assertNull(getString(db, "key-y"));
+ Assertions.assertEquals("value-x", getString(db, "key-x"));
+ Assertions.assertEquals("value-z", getString(db, "key-z"));
+
+ // Compaction should clean up tombstones
+ db.compact();
+
+ Assertions.assertNull(getString(db, "key-y"));
+ Assertions.assertEquals("value-x", getString(db, "key-x"));
+ Assertions.assertEquals("value-z", getString(db, "key-z"));
+ } finally {
+ db.close();
+ }
+ }
+
+ // ---- Tests for group-based merge optimization ----
+
+ @Test
+ public void testNonOverlappingFilesSkipMerge() throws IOException {
+ // Non-overlapping files should be kept as-is during compaction
(skipped groups).
+ SimpleLsmKvDb db =
+ SimpleLsmKvDb.builder(new File(tempDir.toFile(),
"non-overlap-skip-db"))
+ .memTableFlushThreshold(1024 * 1024)
+ .blockSize(256)
+ .cacheSize(4 * 1024 * 1024)
+ .level0FileNumCompactTrigger(3)
+ .compressOptions(new CompressOptions("none", 1))
+ .build();
+
+ try {
+ // Each flush creates a file with a distinct, non-overlapping key
range
+ putString(db, "aaa", "1");
+ db.flush();
+ putString(db, "mmm", "2");
+ db.flush();
+ putString(db, "zzz", "3");
+ db.flush(); // triggers compaction (threshold = 3)
+
+ // Data integrity must be preserved
+ Assertions.assertEquals("1", getString(db, "aaa"));
+ Assertions.assertEquals("2", getString(db, "mmm"));
+ Assertions.assertEquals("3", getString(db, "zzz"));
+ } finally {
+ db.close();
+ }
+ }
+
+ @Test
+ public void testOverlappingFilesAreMergedInGroups() throws IOException {
+ // Files with overlapping key ranges should be merged within the same
group.
+ SimpleLsmKvDb db =
+ SimpleLsmKvDb.builder(new File(tempDir.toFile(),
"overlap-group-db"))
+ .memTableFlushThreshold(1024 * 1024)
+ .blockSize(256)
+ .cacheSize(4 * 1024 * 1024)
+ .level0FileNumCompactTrigger(3)
+ .compressOptions(new CompressOptions("none", 1))
+ .build();
+
+ try {
+ // All three flushes share "shared-key", so all files overlap
+ putString(db, "shared-key", "v1");
+ putString(db, "aaa", "a1");
+ db.flush();
+ putString(db, "shared-key", "v2");
+ putString(db, "bbb", "b1");
+ db.flush();
+ putString(db, "shared-key", "v3");
+ putString(db, "ccc", "c1");
+ db.flush(); // triggers compaction
+
+ // Newest value wins for the shared key
+ Assertions.assertEquals("v3", getString(db, "shared-key"));
+ Assertions.assertEquals("a1", getString(db, "aaa"));
+ Assertions.assertEquals("b1", getString(db, "bbb"));
+ Assertions.assertEquals("c1", getString(db, "ccc"));
+ } finally {
+ db.close();
+ }
+ }
+
+ @Test
+ public void testMixedOverlapAndNonOverlapGroups() throws IOException {
+ // Some files overlap (forming one group) while others don't (separate
groups).
+ SimpleLsmKvDb db =
+ SimpleLsmKvDb.builder(new File(tempDir.toFile(),
"mixed-group-db"))
+ .memTableFlushThreshold(1024 * 1024)
+ .blockSize(256)
+ .cacheSize(4 * 1024 * 1024)
+ .level0FileNumCompactTrigger(4)
+ .compressOptions(new CompressOptions("none", 1))
+ .build();
+
+ try {
+ // Run 1: keys a-c (overlaps with run 2)
+ putString(db, "a", "a1");
+ putString(db, "b", "b1");
+ putString(db, "c", "c1");
+ db.flush();
+
+ // Run 2: keys b-d (overlaps with run 1, forms one group)
+ putString(db, "b", "b2");
+ putString(db, "d", "d1");
+ db.flush();
+
+ // Run 3: keys x-z (no overlap with runs 1-2, separate group)
+ putString(db, "x", "x1");
+ putString(db, "y", "y1");
+ putString(db, "z", "z1");
+ db.flush();
+
+ // Run 4: key m (no overlap, separate group) — triggers compaction
+ putString(db, "m", "m1");
+ db.flush();
+
+ // Verify data integrity: overlapping keys use newest value
+ Assertions.assertEquals("a1", getString(db, "a"));
+ Assertions.assertEquals("b2", getString(db, "b")); // newest wins
+ Assertions.assertEquals("c1", getString(db, "c"));
+ Assertions.assertEquals("d1", getString(db, "d"));
+ Assertions.assertEquals("m1", getString(db, "m"));
+ Assertions.assertEquals("x1", getString(db, "x"));
+ Assertions.assertEquals("y1", getString(db, "y"));
+ Assertions.assertEquals("z1", getString(db, "z"));
+ } finally {
+ db.close();
+ }
+ }
+
+ @Test
+ public void testTombstoneFileNotSkippedDuringFullCompact() throws
IOException {
+ // A non-overlapping file with tombstones should NOT be skipped during
full compaction
+ // (dropTombstones=true), ensuring tombstones are cleaned up.
+ SimpleLsmKvDb db =
+ SimpleLsmKvDb.builder(new File(tempDir.toFile(),
"tombstone-no-skip-db"))
+ .memTableFlushThreshold(1024 * 1024)
+ .blockSize(256)
+ .cacheSize(4 * 1024 * 1024)
+ .level0FileNumCompactTrigger(3)
+ .compressOptions(new CompressOptions("none", 1))
+ .build();
+
+ try {
+ putString(db, "key1", "value1");
+ putString(db, "key2", "value2");
+ db.flush();
+
+ // Delete key1 — creates a tombstone
+ deleteString(db, "key1");
+ db.flush();
+
+ // Full compaction should process the tombstone file even if it
doesn't overlap
+ db.compact();
+
+ Assertions.assertNull(getString(db, "key1"));
+ Assertions.assertEquals("value2", getString(db, "key2"));
+ } finally {
+ db.close();
+ }
+ }
+
+ @Test
+ public void testGroupMergeWithMultipleCompactionRounds() throws
IOException {
+ // Multiple rounds of compaction with mixed
overlapping/non-overlapping data.
+ SimpleLsmKvDb db =
+ SimpleLsmKvDb.builder(new File(tempDir.toFile(),
"multi-round-group-db"))
+ .memTableFlushThreshold(1024 * 1024)
+ .blockSize(256)
+ .cacheSize(4 * 1024 * 1024)
+ .level0FileNumCompactTrigger(3)
+ .compressOptions(new CompressOptions("none", 1))
+ .build();
+
+ try {
+ // Round 1: non-overlapping keys
+ putString(db, "a", "a1");
+ db.flush();
+ putString(db, "m", "m1");
+ db.flush();
+ putString(db, "z", "z1");
+ db.flush(); // triggers compaction
+
+ // Round 2: overlapping updates
+ putString(db, "a", "a2");
+ db.flush();
+ putString(db, "a", "a3");
+ putString(db, "b", "b1");
+ db.flush();
+ putString(db, "c", "c1");
+ db.flush(); // triggers compaction again
+
+ // Full compaction to consolidate everything
+ db.compact();
+
+ Assertions.assertEquals("a3", getString(db, "a"));
+ Assertions.assertEquals("b1", getString(db, "b"));
+ Assertions.assertEquals("c1", getString(db, "c"));
+ Assertions.assertEquals("m1", getString(db, "m"));
+ Assertions.assertEquals("z1", getString(db, "z"));
+ } finally {
+ db.close();
+ }
+ }
+
+ @Test
+ public void testCompactionMergesAllL0RunsAndIncludesL1() throws
IOException {
+ // Verify that size-ratio compaction always merges all L0 files and L1
data,
+ // so the total sorted run count never exceeds maxLevels.
+ SimpleLsmKvDb db =
+ SimpleLsmKvDb.builder(new File(tempDir.toFile(),
"l0-clear-db"))
+ .memTableFlushThreshold(1024 * 1024)
+ .blockSize(256)
+ .cacheSize(4 * 1024 * 1024)
+ .level0FileNumCompactTrigger(3)
+ .sizeRatio(1)
+ .compressOptions(new CompressOptions("none", 1))
+ .build();
+
+ try {
+ // Step 1: Create 3 L0 files to trigger first compaction
+ putString(db, "aaa", "v1");
+ db.flush();
+ putString(db, "bbb", "v2");
+ db.flush();
+ putString(db, "ccc", "v3");
+ db.flush();
+ // After compaction, data should exist in SST files
+ Assertions.assertTrue(
+ db.getSstFileCount() > 0, "SST files should exist after
compaction");
+
+ // Step 2: Create 3 more L0 files to trigger second compaction
+ // Now deeper levels already have data from the first compaction.
+ // The fix ensures L1 is included in the merge, preventing
overflow.
+ putString(db, "ddd", "v4");
+ db.flush();
+ putString(db, "eee", "v5");
+ db.flush();
+ putString(db, "fff", "v6");
+ db.flush();
+
+ // All data should be accessible after multiple compaction rounds
+ Assertions.assertEquals("v1", getString(db, "aaa"));
+ Assertions.assertEquals("v2", getString(db, "bbb"));
+ Assertions.assertEquals("v3", getString(db, "ccc"));
+ Assertions.assertEquals("v4", getString(db, "ddd"));
+ Assertions.assertEquals("v5", getString(db, "eee"));
+ Assertions.assertEquals("v6", getString(db, "fff"));
+
+ // Total file count across all levels should be reasonable (no
overflow)
+ int totalFiles = db.getSstFileCount();
+ Assertions.assertTrue(
+ totalFiles > 0 && totalFiles < 20,
+ "File count should be reasonable, got: " + totalFiles);
+ } finally {
+ db.close();
+ }
+ }
+
+ @Test
+ public void testCompactionWithManyRoundsNoOverflow() throws IOException {
+ // Stress test: many rounds of flush + compaction to ensure no
overflow ever occurs
+ // and data integrity is maintained throughout.
+ SimpleLsmKvDb db =
+ SimpleLsmKvDb.builder(new File(tempDir.toFile(), "many-l0-db"))
+ .memTableFlushThreshold(1024 * 1024)
+ .blockSize(256)
+ .cacheSize(4 * 1024 * 1024)
+ .level0FileNumCompactTrigger(3)
+ .sizeRatio(1)
+ .compressOptions(new CompressOptions("none", 1))
+ .build();
+
+ try {
+ int totalRounds = 10;
+ for (int round = 0; round < totalRounds; round++) {
+ for (int i = 0; i < 3; i++) {
+ String key = String.format("round-%02d-key-%02d", round,
i);
+ String value = String.format("value-%02d-%02d", round, i);
+ putString(db, key, value);
+ db.flush();
+ }
+ // After each batch of 3 flushes, compaction should have been
triggered.
+ // Verify that the number of occupied levels is within bounds.
+ int occupiedLevels = 0;
+ for (int level = 0; level < SimpleLsmKvDb.MAX_LEVELS; level++)
{
+ if (db.getLevelFileCount(level) > 0) {
+ occupiedLevels++;
+ }
+ }
+ Assertions.assertTrue(
+ occupiedLevels <= SimpleLsmKvDb.MAX_LEVELS,
+ "Occupied levels should not exceed MAX_LEVELS in round
"
+ + round
+ + ", got: "
+ + occupiedLevels);
+ }
+
+ // Verify all data is still accessible
+ for (int round = 0; round < totalRounds; round++) {
+ for (int i = 0; i < 3; i++) {
+ String key = String.format("round-%02d-key-%02d", round,
i);
+ String expected = String.format("value-%02d-%02d", round,
i);
+ Assertions.assertEquals(expected, getString(db, key),
"Mismatch for " + key);
+ }
+ }
+ } finally {
+ db.close();
+ }
+ }
+
+ @Test
+ public void testCompactionWithOverlappingKeysAcrossL0AndL1() throws
IOException {
+ // Verify that when L0 files have overlapping keys with existing level
data,
+ // compaction correctly deduplicates and the newest value wins.
+ SimpleLsmKvDb db =
+ SimpleLsmKvDb.builder(new File(tempDir.toFile(),
"overlap-l0-l1-db"))
+ .memTableFlushThreshold(1024 * 1024)
+ .blockSize(256)
+ .cacheSize(4 * 1024 * 1024)
+ .level0FileNumCompactTrigger(3)
+ .sizeRatio(1)
+ .compressOptions(new CompressOptions("none", 1))
+ .build();
+
+ try {
+ // First round: write and compact to push data into deeper levels
+ putString(db, "shared", "version-1");
+ putString(db, "only-old", "old-value");
+ db.flush();
+ putString(db, "aaa", "a1");
+ db.flush();
+ putString(db, "bbb", "b1");
+ db.flush();
+ // Compaction triggered
+
+ // Second round: overwrite "shared" key and trigger compaction
again
+ // This forces merge with overlapping keys across levels
+ putString(db, "shared", "version-2");
+ db.flush();
+ putString(db, "ccc", "c1");
+ db.flush();
+ putString(db, "ddd", "d1");
+ db.flush();
+ // Compaction triggered again
+
+ // Newest value should win
+ Assertions.assertEquals("version-2", getString(db, "shared"));
+ Assertions.assertEquals("old-value", getString(db, "only-old"));
+ Assertions.assertEquals("a1", getString(db, "aaa"));
+ Assertions.assertEquals("b1", getString(db, "bbb"));
+ Assertions.assertEquals("c1", getString(db, "ccc"));
+ Assertions.assertEquals("d1", getString(db, "ddd"));
+
+ // Third round: overwrite again and verify
+ putString(db, "shared", "version-3");
+ db.flush();
+ putString(db, "eee", "e1");
+ db.flush();
+ putString(db, "fff", "f1");
+ db.flush();
+
+ Assertions.assertEquals("version-3", getString(db, "shared"));
+ Assertions.assertEquals("old-value", getString(db, "only-old"));
+ Assertions.assertEquals("e1", getString(db, "eee"));
+ Assertions.assertEquals("f1", getString(db, "fff"));
+ } finally {
+ db.close();
+ }
+ }
+
+ @Test
+ public void testGroupMergePreservesDeleteSemantics() throws IOException {
+ // Ensure that deletes are correctly handled across group-based merge.
+ SimpleLsmKvDb db =
+ SimpleLsmKvDb.builder(new File(tempDir.toFile(),
"group-delete-db"))
+ .memTableFlushThreshold(1024 * 1024)
+ .blockSize(256)
+ .cacheSize(4 * 1024 * 1024)
+ .level0FileNumCompactTrigger(3)
+ .compressOptions(new CompressOptions("none", 1))
+ .build();
+
+ try {
+ // Write keys in different ranges
+ putString(db, "aaa", "v1");
+ putString(db, "bbb", "v2");
+ db.flush();
+
+ putString(db, "xxx", "v3");
+ putString(db, "yyy", "v4");
+ db.flush();
+
+ // Delete one key from each range
+ deleteString(db, "aaa");
+ deleteString(db, "xxx");
+ db.flush(); // triggers compaction
+
+ Assertions.assertNull(getString(db, "aaa"));
+ Assertions.assertEquals("v2", getString(db, "bbb"));
+ Assertions.assertNull(getString(db, "xxx"));
+ Assertions.assertEquals("v4", getString(db, "yyy"));
+
+ // Full compaction should clean up tombstones
+ db.compact();
+
+ Assertions.assertNull(getString(db, "aaa"));
+ Assertions.assertEquals("v2", getString(db, "bbb"));
+ Assertions.assertNull(getString(db, "xxx"));
+ Assertions.assertEquals("v4", getString(db, "yyy"));
+ } finally {
+ db.close();
+ }
+ }
+
+ private static void putString(SimpleLsmKvDb db, String key, String value)
throws IOException {
+ db.put(key.getBytes(UTF_8), value.getBytes(UTF_8));
+ }
+
+ private static String getString(SimpleLsmKvDb db, String key) throws
IOException {
+ byte[] bytes = db.get(key.getBytes(UTF_8));
+ if (bytes == null) {
+ return null;
+ }
+ return new String(bytes, UTF_8);
+ }
+
+ private static void deleteString(SimpleLsmKvDb db, String key) throws
IOException {
+ db.delete(key.getBytes(UTF_8));
+ }
+}