This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 271ad81c84 [core] Add bulk load for bootstrapping in clustering
compact (#7497)
271ad81c84 is described below
commit 271ad81c843bb262e02da9f3eb435b6f680e53cb
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Mar 23 11:48:56 2026 +0800
[core] Add bulk load for bootstrapping in clustering compact (#7497)
Adds a bulkLoad() method to `SimpleLsmKvDb` that writes globally sorted
entries directly into SST files at the deepest level (L3), bypassing
MemTable/flush/compaction. This is used by
`ClusteringKeyIndex.bootstrap()` to efficiently populate the key index
during restore.
---
.../paimon/lookup/sort/db/SimpleLsmKvDb.java | 86 +++
.../paimon/lookup/sort/db/SimpleLsmKvDbTest.java | 124 ++++
.../clustering/ClusteringCompactManager.java | 657 ++-------------------
...actManager.java => ClusteringFileRewriter.java} | 545 ++++-------------
.../compact/clustering/ClusteringKeyIndex.java | 317 ++++++++++
.../paimon/separated/ClusteringTableTest.java | 50 ++
6 files changed, 740 insertions(+), 1039 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/db/SimpleLsmKvDb.java
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/db/SimpleLsmKvDb.java
index 7ee095da86..8871fcab48 100644
---
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/db/SimpleLsmKvDb.java
+++
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/db/SimpleLsmKvDb.java
@@ -38,6 +38,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
@@ -94,6 +95,7 @@ public class SimpleLsmKvDb implements Closeable {
private final SortLookupStoreFactory storeFactory;
private final Comparator<MemorySlice> keyComparator;
private final long memTableFlushThreshold;
+ private final long maxSstFileSize;
private final LsmCompactor compactor;
/** Active MemTable: key -> value bytes (empty byte[] = tombstone). */
@@ -127,6 +129,7 @@ public class SimpleLsmKvDb implements Closeable {
this.storeFactory = storeFactory;
this.keyComparator = keyComparator;
this.memTableFlushThreshold = memTableFlushThreshold;
+ this.maxSstFileSize = maxSstFileSize;
this.memTable = new TreeMap<>(keyComparator);
this.memTableSize = 0;
this.levels = new ArrayList<>();
@@ -225,6 +228,89 @@ public class SimpleLsmKvDb implements Closeable {
maybeFlushMemTable();
}
+ /**
+ * Bulk-load globally sorted entries directly into SST files at the
deepest level, bypassing
+ * MemTable, flush, and compaction entirely. The database must be empty
when this is called.
+ *
+ * @param sortedEntries an iterator of key-value pairs in sorted order (by
the DB's key
+ * comparator)
+ */
+ public void bulkLoad(Iterator<Map.Entry<byte[], byte[]>> sortedEntries)
throws IOException {
+ ensureOpen();
+ if (!memTable.isEmpty() || getSstFileCount() > 0) {
+ throw new IllegalStateException(
+ "bulkLoad requires an empty database (no memTable entries
and no SST files)");
+ }
+
+ int targetLevel = MAX_LEVELS - 1;
+ List<SstFileMetadata> targetLevelFiles = levels.get(targetLevel);
+
+ SortLookupStoreWriter currentWriter = null;
+ File currentSstFile = null;
+ MemorySlice currentFileMinKey = null;
+ MemorySlice currentFileMaxKey = null;
+ long currentBatchSize = 0;
+
+ try {
+ while (sortedEntries.hasNext()) {
+ Map.Entry<byte[], byte[]> entry = sortedEntries.next();
+ byte[] key = entry.getKey();
+ byte[] value = entry.getValue();
+
+ if (currentWriter == null) {
+ currentSstFile = newSstFile();
+ currentWriter = storeFactory.createWriter(currentSstFile,
null);
+ currentFileMinKey = MemorySlice.wrap(key);
+ currentBatchSize = 0;
+ }
+
+ currentWriter.put(key, value);
+ currentFileMaxKey = MemorySlice.wrap(key);
+ currentBatchSize += key.length + value.length;
+
+ if (currentBatchSize >= maxSstFileSize) {
+ currentWriter.close();
+ targetLevelFiles.add(
+ new SstFileMetadata(
+ currentSstFile,
+ currentFileMinKey,
+ currentFileMaxKey,
+ 0,
+ targetLevel));
+ currentWriter = null;
+ currentSstFile = null;
+ currentFileMinKey = null;
+ currentFileMaxKey = null;
+ }
+ }
+
+ if (currentWriter != null) {
+ currentWriter.close();
+ targetLevelFiles.add(
+ new SstFileMetadata(
+ currentSstFile,
+ currentFileMinKey,
+ currentFileMaxKey,
+ 0,
+ targetLevel));
+ }
+ } catch (IOException | RuntimeException e) {
+ if (currentWriter != null) {
+ try {
+ currentWriter.close();
+ } catch (IOException suppressed) {
+ e.addSuppressed(suppressed);
+ }
+ }
+ throw e;
+ }
+
+ LOG.info(
+ "Bulk-loaded {} SST files directly to level {}",
+ targetLevelFiles.size(),
+ targetLevel);
+ }
+
//
-------------------------------------------------------------------------
// Read Operations
//
-------------------------------------------------------------------------
diff --git
a/paimon-common/src/test/java/org/apache/paimon/lookup/sort/db/SimpleLsmKvDbTest.java
b/paimon-common/src/test/java/org/apache/paimon/lookup/sort/db/SimpleLsmKvDbTest.java
index 52dbfd3d01..44d7d46ff5 100644
---
a/paimon-common/src/test/java/org/apache/paimon/lookup/sort/db/SimpleLsmKvDbTest.java
+++
b/paimon-common/src/test/java/org/apache/paimon/lookup/sort/db/SimpleLsmKvDbTest.java
@@ -28,7 +28,11 @@ import org.junit.jupiter.api.io.TempDir;
import java.io.File;
import java.io.IOException;
+import java.util.AbstractMap;
+import java.util.ArrayList;
import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
import static java.nio.charset.StandardCharsets.UTF_8;
@@ -1321,6 +1325,126 @@ public class SimpleLsmKvDbTest {
}
}
+ @Test
+ public void testBulkLoad() throws IOException {
+ try (SimpleLsmKvDb db = createDb()) {
+ // Prepare sorted entries
+ List<Map.Entry<byte[], byte[]>> entries = new ArrayList<>();
+ for (int i = 0; i < 100; i++) {
+ String key = String.format("key-%05d", i);
+ String value = String.format("value-%05d", i);
+ entries.add(
+ new AbstractMap.SimpleImmutableEntry<>(
+ key.getBytes(UTF_8), value.getBytes(UTF_8)));
+ }
+
+ db.bulkLoad(entries.iterator());
+
+ // All data at deepest level, no L0 files
+ Assertions.assertEquals(0, db.getLevelFileCount(0));
+
Assertions.assertTrue(db.getLevelFileCount(SimpleLsmKvDb.MAX_LEVELS - 1) > 0);
+
+ // All keys should be readable
+ for (int i = 0; i < 100; i++) {
+ String expected = String.format("value-%05d", i);
+ String actual = getString(db, String.format("key-%05d", i));
+ Assertions.assertEquals(expected, actual, "Mismatch at index "
+ i);
+ }
+ }
+ }
+
+ @Test
+ public void testBulkLoadMultipleSstFiles() throws IOException {
+ // Use a small maxSstFileSize to force multiple SST files
+ SimpleLsmKvDb db =
+ SimpleLsmKvDb.builder(new File(tempDir.toFile(),
"bulk-multi-db"))
+ .memTableFlushThreshold(1024)
+ .maxSstFileSize(512)
+ .blockSize(128)
+ .level0FileNumCompactTrigger(4)
+ .compressOptions(new CompressOptions("none", 1))
+ .build();
+
+ try {
+ List<Map.Entry<byte[], byte[]>> entries = new ArrayList<>();
+ for (int i = 0; i < 200; i++) {
+ String key = String.format("key-%05d", i);
+ String value = String.format("value-%05d", i);
+ entries.add(
+ new AbstractMap.SimpleImmutableEntry<>(
+ key.getBytes(UTF_8), value.getBytes(UTF_8)));
+ }
+
+ db.bulkLoad(entries.iterator());
+
+ // Multiple SST files should be created at the deepest level
+ int deepestLevelFiles =
db.getLevelFileCount(SimpleLsmKvDb.MAX_LEVELS - 1);
+ Assertions.assertTrue(
+ deepestLevelFiles > 1,
+ "Expected multiple SST files at deepest level, got " +
deepestLevelFiles);
+ Assertions.assertEquals(0, db.getLevelFileCount(0));
+
+ // All keys should be readable
+ for (int i = 0; i < 200; i++) {
+ String expected = String.format("value-%05d", i);
+ String actual = getString(db, String.format("key-%05d", i));
+ Assertions.assertEquals(expected, actual, "Mismatch at index "
+ i);
+ }
+ } finally {
+ db.close();
+ }
+ }
+
+ @Test
+ public void testBulkLoadEmptyIterator() throws IOException {
+ try (SimpleLsmKvDb db = createDb()) {
+ List<Map.Entry<byte[], byte[]>> empty = new ArrayList<>();
+ db.bulkLoad(empty.iterator());
+
+ Assertions.assertEquals(0, db.getSstFileCount());
+ Assertions.assertNull(getString(db, "any-key"));
+ }
+ }
+
+ @Test
+ public void testBulkLoadThenPutAndGet() throws IOException {
+ try (SimpleLsmKvDb db = createDb()) {
+ // Bulk load initial data
+ List<Map.Entry<byte[], byte[]>> entries = new ArrayList<>();
+ for (int i = 0; i < 50; i++) {
+ String key = String.format("key-%05d", i);
+ String value = String.format("value-%05d", i);
+ entries.add(
+ new AbstractMap.SimpleImmutableEntry<>(
+ key.getBytes(UTF_8), value.getBytes(UTF_8)));
+ }
+ db.bulkLoad(entries.iterator());
+
+ // Now use normal put to add/overwrite data
+ putString(db, "key-00000", "overwritten");
+ putString(db, "key-99999", "new-key");
+
+ Assertions.assertEquals("overwritten", getString(db, "key-00000"));
+ Assertions.assertEquals("new-key", getString(db, "key-99999"));
+ Assertions.assertEquals("value-00025", getString(db,
String.format("key-%05d", 25)));
+ }
+ }
+
+ @Test
+ public void testBulkLoadFailsOnNonEmptyDb() throws IOException {
+ try (SimpleLsmKvDb db = createDb()) {
+ putString(db, "existing", "data");
+
+ List<Map.Entry<byte[], byte[]>> entries = new ArrayList<>();
+ entries.add(
+ new AbstractMap.SimpleImmutableEntry<>(
+ "key".getBytes(UTF_8), "value".getBytes(UTF_8)));
+
+ Assertions.assertThrows(
+ IllegalStateException.class, () ->
db.bulkLoad(entries.iterator()));
+ }
+ }
+
private static void putString(SimpleLsmKvDb db, String key, String value)
throws IOException {
db.put(key.getBytes(UTF_8), value.getBytes(UTF_8));
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManager.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManager.java
index 8c8806daeb..9e925d0764 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManager.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManager.java
@@ -27,57 +27,29 @@ import org.apache.paimon.compact.CompactDeletionFile;
import org.apache.paimon.compact.CompactFutureManager;
import org.apache.paimon.compact.CompactResult;
import org.apache.paimon.compact.CompactTask;
-import org.apache.paimon.compression.BlockCompressionFactory;
import org.apache.paimon.compression.CompressOptions;
-import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.data.serializer.BinaryRowSerializer;
-import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.data.serializer.RowCompactedSerializer;
import org.apache.paimon.deletionvectors.BucketedDvMaintainer;
-import org.apache.paimon.disk.ChannelReaderInputView;
-import org.apache.paimon.disk.ChannelReaderInputViewIterator;
-import org.apache.paimon.disk.ChannelWithMeta;
-import org.apache.paimon.disk.ChannelWriterOutputView;
-import org.apache.paimon.disk.FileChannelUtil;
-import org.apache.paimon.disk.FileIOChannel;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.KeyValueFileReaderFactory;
import org.apache.paimon.io.KeyValueFileWriterFactory;
-import org.apache.paimon.io.RollingFileWriter;
import org.apache.paimon.io.cache.CacheManager;
import org.apache.paimon.lookup.sort.db.SimpleLsmKvDb;
import org.apache.paimon.operation.metrics.CompactionMetrics;
-import org.apache.paimon.options.MemorySize;
-import org.apache.paimon.reader.FileRecordIterator;
-import org.apache.paimon.reader.RecordReader;
-import org.apache.paimon.reader.RecordReader.RecordIterator;
-import org.apache.paimon.sort.BinaryExternalSortBuffer;
import org.apache.paimon.types.RowType;
-import org.apache.paimon.utils.CloseableIterator;
-import org.apache.paimon.utils.KeyValueWithLevelNoReusingSerializer;
-import org.apache.paimon.utils.MutableObjectIterator;
import javax.annotation.Nullable;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Comparator;
import java.util.List;
import java.util.Optional;
-import java.util.PriorityQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.stream.IntStream;
import static java.util.Collections.singletonList;
-import static org.apache.paimon.utils.VarLengthIntUtils.decodeInt;
-import static org.apache.paimon.utils.VarLengthIntUtils.encodeInt;
/**
* Key Value clustering compact manager for {@link KeyValueFileStore}.
@@ -96,27 +68,14 @@ public class ClusteringCompactManager extends
CompactFutureManager {
private final RowType keyType;
private final RowType valueType;
- private final long sortSpillBufferSize;
- private final int pageSize;
- private final int maxNumFileHandles;
- private final int spillThreshold;
- private final CompressOptions compression;
- private final int[] clusteringColumns;
- private final RecordComparator clusteringComparatorAlone;
- private final RecordComparator clusteringComparatorInValue;
- private final IOManager ioManager;
- private final KeyValueFileReaderFactory keyReaderFactory;
- private final KeyValueFileReaderFactory valueReaderFactory;
- private final KeyValueFileWriterFactory writerFactory;
private final ExecutorService executor;
private final BucketedDvMaintainer dvMaintainer;
- private final SimpleLsmKvDb kvDb;
private final boolean lazyGenDeletionFile;
- private final boolean firstRow;
@Nullable private final CompactionMetrics.Reporter metricsReporter;
private final ClusteringFiles fileLevels;
- private final long targetFileSize;
+ private final ClusteringKeyIndex keyIndex;
+ private final ClusteringFileRewriter fileRewriter;
public ClusteringCompactManager(
RowType keyType,
@@ -139,78 +98,64 @@ public class ClusteringCompactManager extends
CompactFutureManager {
CompressOptions compression,
boolean firstRow,
@Nullable CompactionMetrics.Reporter metricsReporter) {
- this.targetFileSize = targetFileSize;
this.keyType = keyType;
this.valueType = valueType;
- this.sortSpillBufferSize = sortSpillBufferSize;
- this.pageSize = pageSize;
- this.maxNumFileHandles = maxNumFileHandles;
- this.spillThreshold = spillThreshold;
- this.compression = compression;
- this.firstRow = firstRow;
- this.clusteringColumns = valueType.projectIndexes(clusteringColumns);
- this.clusteringComparatorAlone =
- CodeGenUtils.newRecordComparator(
- valueType.project(clusteringColumns).getFieldTypes(),
- IntStream.range(0, clusteringColumns.size()).toArray(),
- true);
- this.clusteringComparatorInValue =
- CodeGenUtils.newRecordComparator(
- valueType.getFieldTypes(), this.clusteringColumns,
true);
- this.ioManager = ioManager;
- this.keyReaderFactory = keyReaderFactory;
- this.valueReaderFactory = valueReaderFactory;
- this.writerFactory = writerFactory;
this.executor = executor;
this.dvMaintainer = dvMaintainer;
this.lazyGenDeletionFile = lazyGenDeletionFile;
this.metricsReporter = metricsReporter;
+
this.fileLevels = new ClusteringFiles();
restoreFiles.forEach(this::addNewFile);
- this.kvDb =
+ int[] clusteringColumnIndexes =
valueType.projectIndexes(clusteringColumns);
+ RecordComparator clusteringComparatorAlone =
+ CodeGenUtils.newRecordComparator(
+ valueType.project(clusteringColumns).getFieldTypes(),
+ IntStream.range(0, clusteringColumns.size()).toArray(),
+ true);
+ RecordComparator clusteringComparatorInValue =
+ CodeGenUtils.newRecordComparator(
+ valueType.getFieldTypes(), clusteringColumnIndexes,
true);
+
+ SimpleLsmKvDb kvDb =
SimpleLsmKvDb.builder(new File(ioManager.pickRandomTempDir()))
.cacheManager(cacheManager)
.keyComparator(new
RowCompactedSerializer(keyType).createSliceComparator())
.build();
- bootstrapKeyIndex(restoreFiles);
- }
-
- private void bootstrapKeyIndex(List<DataFileMeta> restoreFiles) {
- RowCompactedSerializer keySerializer = new
RowCompactedSerializer(keyType);
- for (DataFileMeta file : restoreFiles) {
- if (file.level() == 0) {
- continue;
- }
- int fileId = fileLevels.getFileIdByName(file.fileName());
- // Read with DV (auto-skips deleted rows). Use
FileRecordIterator.returnedPosition()
- // to get correct physical positions even after DV filtering.
- try (RecordReader<KeyValue> reader =
keyReaderFactory.createRecordReader(file)) {
- FileRecordIterator<KeyValue> batch;
- while ((batch = (FileRecordIterator<KeyValue>)
reader.readBatch()) != null) {
- KeyValue kv;
- while ((kv = batch.next()) != null) {
- int position = (int) batch.returnedPosition();
- byte[] keyBytes =
keySerializer.serializeToBytes(kv.key());
- ByteArrayOutputStream value = new
ByteArrayOutputStream(8);
- encodeInt(value, fileId);
- encodeInt(value, position);
- kvDb.put(keyBytes, value.toByteArray());
- }
- batch.releaseBatch();
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- }
- private CloseableIterator<InternalRow> readKeyIterator(DataFileMeta file)
throws IOException {
- //noinspection resource
- return keyReaderFactory
- .createRecordReader(file)
- .transform(KeyValue::key)
- .toCloseableIterator();
+ this.keyIndex =
+ new ClusteringKeyIndex(
+ keyType,
+ ioManager,
+ keyReaderFactory,
+ dvMaintainer,
+ kvDb,
+ fileLevels,
+ firstRow,
+ sortSpillBufferSize,
+ pageSize,
+ maxNumFileHandles,
+ compression);
+ keyIndex.bootstrap(restoreFiles);
+
+ this.fileRewriter =
+ new ClusteringFileRewriter(
+ keyType,
+ valueType,
+ clusteringColumnIndexes,
+ clusteringComparatorAlone,
+ clusteringComparatorInValue,
+ ioManager,
+ valueReaderFactory,
+ writerFactory,
+ fileLevels,
+ targetFileSize,
+ sortSpillBufferSize,
+ pageSize,
+ maxNumFileHandles,
+ spillThreshold,
+ compression);
}
@Override
@@ -246,7 +191,6 @@ public class ClusteringCompactManager extends
CompactFutureManager {
}
private CompactResult compact(boolean fullCompaction) throws Exception {
- RowCompactedSerializer keySerializer = new
RowCompactedSerializer(keyType);
KeyValueSerializer kvSerializer = new KeyValueSerializer(keyType,
valueType);
RowType kvSchemaType = KeyValue.schema(keyType, valueType);
@@ -258,25 +202,32 @@ public class ClusteringCompactManager extends
CompactFutureManager {
List<DataFileMeta> existingSortedFiles = fileLevels.sortedFiles();
for (DataFileMeta file : unsortedFiles) {
List<DataFileMeta> sortedFiles =
- sortAndRewriteFiles(singletonList(file), kvSerializer,
kvSchemaType);
- updateKeyIndex(keySerializer, file, sortedFiles);
+ fileRewriter.sortAndRewriteFiles(
+ singletonList(file), kvSerializer, kvSchemaType);
+ keyIndex.updateIndex(file, sortedFiles);
result.before().add(file);
result.after().addAll(sortedFiles);
}
// Phase 2: Universal Compaction on sorted files that existed before
Phase 1.
- // Files produced by Phase 1 are excluded to avoid the same file
appearing in both
- // result.before() and result.after().
List<List<DataFileMeta>> mergeGroups;
if (fullCompaction) {
mergeGroups = singletonList(existingSortedFiles);
} else {
- mergeGroups = pickMergeCandidates(existingSortedFiles);
+ mergeGroups =
fileRewriter.pickMergeCandidates(existingSortedFiles);
}
for (List<DataFileMeta> mergeGroup : mergeGroups) {
if (mergeGroup.size() >= 2) {
- List<DataFileMeta> mergedFiles =
mergeAndRewriteFiles(mergeGroup, keySerializer);
+ // Delete key index entries before merge
+ for (DataFileMeta file : mergeGroup) {
+ keyIndex.deleteIndex(file);
+ }
+ List<DataFileMeta> mergedFiles =
fileRewriter.mergeAndRewriteFiles(mergeGroup);
+ // Rebuild key index for new files
+ for (DataFileMeta newFile : mergedFiles) {
+ keyIndex.rebuildIndex(newFile);
+ }
result.before().addAll(mergeGroup);
result.after().addAll(mergedFiles);
}
@@ -290,494 +241,6 @@ public class ClusteringCompactManager extends
CompactFutureManager {
return result;
}
- /**
- * Pick merge candidate groups based on clustering column range overlap
and file sizes.
- *
- * <ol>
- * <li><b>Group into sections</b>: Files are sorted by minKey and
grouped into sections based
- * on clustering column key range overlap. Overlapping files belong
to the same section.
- * <li><b>Merge adjacent sections</b>: Sections that have overlapping
files (size >= 2) or
- * are small (total size < targetFileSize/2) are accumulated
together. Large
- * single-file sections act as barriers, flushing accumulated files
into a merge group.
- * </ol>
- *
- * @param sortedFiles all sorted files
- * @return list of merge groups; each group contains files to merge
together
- */
- private List<List<DataFileMeta>> pickMergeCandidates(List<DataFileMeta>
sortedFiles) {
- if (sortedFiles.size() < 2) {
- return java.util.Collections.emptyList();
- }
-
- // Step 1: Group files into sections based on clustering column range
overlap.
- List<List<DataFileMeta>> sections = groupIntoSections(sortedFiles);
-
- // Step 2: Merge adjacent sections when beneficial to reduce small
files.
- // A section should be merged if it has overlapping files (size >= 2)
or is small.
- long smallSectionThreshold = targetFileSize / 2;
- List<List<DataFileMeta>> mergeGroups = new ArrayList<>();
- List<DataFileMeta> pending = new ArrayList<>();
-
- for (List<DataFileMeta> section : sections) {
- boolean needsMerge = section.size() >= 2;
- boolean isSmall = sectionSize(section) < smallSectionThreshold;
-
- if (needsMerge || isSmall) {
- // This section should be merged, accumulate it
- pending.addAll(section);
- } else {
- // This section is a single large file, flush pending if any
- if (pending.size() >= 2) {
- mergeGroups.add(new ArrayList<>(pending));
- }
- pending.clear();
- }
- }
-
- // Flush remaining pending files
- if (pending.size() >= 2) {
- mergeGroups.add(pending);
- }
-
- return mergeGroups;
- }
-
- private long sectionSize(List<DataFileMeta> section) {
- long total = 0;
- for (DataFileMeta file : section) {
- total += file.fileSize();
- }
- return total;
- }
-
- /**
- * Group files into sections based on clustering column key range overlap.
Files are first
- * sorted by minKey, then adjacent files with overlapping ranges are
grouped into the same
- * section.
- *
- * @param files input files
- * @return list of sections, each section contains overlapping files
- */
- private List<List<DataFileMeta>> groupIntoSections(List<DataFileMeta>
files) {
- // Sort files by minKey to properly detect overlapping ranges
- List<DataFileMeta> sorted = new ArrayList<>(files);
- sorted.sort((a, b) -> clusteringComparatorAlone.compare(a.minKey(),
b.minKey()));
-
- List<List<DataFileMeta>> sections = new ArrayList<>();
- List<DataFileMeta> currentSection = new ArrayList<>();
- currentSection.add(sorted.get(0));
- BinaryRow currentMax = sorted.get(0).maxKey();
-
- for (int i = 1; i < sorted.size(); i++) {
- DataFileMeta file = sorted.get(i);
- if (clusteringComparatorAlone.compare(currentMax, file.minKey())
>= 0) {
- // Overlaps with current section
- currentSection.add(file);
- if (clusteringComparatorAlone.compare(file.maxKey(),
currentMax) > 0) {
- currentMax = file.maxKey();
- }
- } else {
- sections.add(currentSection);
- currentSection = new ArrayList<>();
- currentSection.add(file);
- currentMax = file.maxKey();
- }
- }
- sections.add(currentSection);
- return sections;
- }
-
- /**
- * Update the key index for a single original file replaced by new sorted
files. Marks old key
- * positions in deletion vectors and registers new positions.
- */
- private void updateKeyIndex(
- RowCompactedSerializer keySerializer,
- DataFileMeta originalFile,
- List<DataFileMeta> newSortedFiles)
- throws Exception {
- updateKeyIndex(keySerializer, singletonList(originalFile),
newSortedFiles);
- }
-
- /**
- * Update the key index for multiple original files replaced by new sorted
files.
- *
- * <p>For DEDUPLICATE mode: mark the old position in deletion vectors,
keep the new position.
- *
- * <p>For FIRST_ROW mode: if key exists, mark the new position in deletion
vectors (keep the
- * first/old one); if key is new, store the new position.
- */
- private void updateKeyIndex(
- RowCompactedSerializer keySerializer,
- List<DataFileMeta> originalFiles,
- List<DataFileMeta> newSortedFiles)
- throws Exception {
- // Collect file names of original files to avoid self-deletion marking
- java.util.Set<String> originalFileNames = new java.util.HashSet<>();
- for (DataFileMeta file : originalFiles) {
- originalFileNames.add(file.fileName());
- }
-
- for (DataFileMeta sortedFile : newSortedFiles) {
- int fileId = fileLevels.getFileIdByName(sortedFile.fileName());
- int position = 0;
- try (CloseableIterator<InternalRow> iterator =
readKeyIterator(sortedFile)) {
- while (iterator.hasNext()) {
- byte[] key =
keySerializer.serializeToBytes(iterator.next());
- byte[] oldValue = kvDb.get(key);
- if (oldValue != null) {
- ByteArrayInputStream valueIn = new
ByteArrayInputStream(oldValue);
- int oldFileId = decodeInt(valueIn);
- int oldPosition = decodeInt(valueIn);
- DataFileMeta oldFile =
fileLevels.getFileById(oldFileId);
- if (oldFile != null &&
!originalFileNames.contains(oldFile.fileName())) {
- if (firstRow) {
- // First-row mode: keep the old (first)
record, delete the new one
-
dvMaintainer.notifyNewDeletion(sortedFile.fileName(), position);
- position++;
- continue;
- } else {
- // Deduplicate mode: keep the new record,
delete the old one
-
dvMaintainer.notifyNewDeletion(oldFile.fileName(), oldPosition);
- }
- }
- }
- ByteArrayOutputStream value = new ByteArrayOutputStream(8);
- encodeInt(value, fileId);
- encodeInt(value, position);
- kvDb.put(key, value.toByteArray());
- position++;
- }
- }
- }
- }
-
- /**
- * Sort and rewrite one or more unsorted files by clustering columns.
Reads all KeyValue records
- * from the input files, sorts them using an external sort buffer, and
writes to new level-1
- * files.
- */
- private List<DataFileMeta> sortAndRewriteFiles(
- List<DataFileMeta> inputFiles, KeyValueSerializer kvSerializer,
RowType kvSchemaType)
- throws Exception {
- int[] sortFieldsInKeyValue =
- Arrays.stream(clusteringColumns)
- .map(i -> i + keyType.getFieldCount() + 2)
- .toArray();
- BinaryExternalSortBuffer sortBuffer =
- BinaryExternalSortBuffer.create(
- ioManager,
- kvSchemaType,
- sortFieldsInKeyValue,
- sortSpillBufferSize,
- pageSize,
- maxNumFileHandles,
- compression,
- MemorySize.MAX_VALUE,
- false);
-
- for (DataFileMeta file : inputFiles) {
- try (RecordReader<KeyValue> reader =
valueReaderFactory.createRecordReader(file)) {
- try (CloseableIterator<KeyValue> iterator =
reader.toCloseableIterator()) {
- while (iterator.hasNext()) {
- KeyValue kv = iterator.next();
- InternalRow serializedRow = kvSerializer.toRow(kv);
- sortBuffer.write(serializedRow);
- }
- }
- }
- }
-
- RollingFileWriter<KeyValue, DataFileMeta> writer =
- writerFactory.createRollingClusteringFileWriter();
- try {
- MutableObjectIterator<BinaryRow> sortedIterator =
sortBuffer.sortedIterator();
- BinaryRow binaryRow = new BinaryRow(kvSchemaType.getFieldCount());
- while ((binaryRow = sortedIterator.next(binaryRow)) != null) {
- KeyValue kv = kvSerializer.fromRow(binaryRow);
- writer.write(
- kv.copy(
- new InternalRowSerializer(keyType),
- new InternalRowSerializer(valueType)));
- }
- } finally {
- sortBuffer.clear();
- writer.close();
- }
-
- List<DataFileMeta> newFiles = writer.result();
- for (DataFileMeta file : inputFiles) {
- fileLevels.removeFile(file);
- }
- for (DataFileMeta newFile : newFiles) {
- fileLevels.addNewFile(newFile);
- }
-
- return newFiles;
- }
-
- /**
- * Merge sorted files using min-heap based multi-way merge. Since all
input files are already
- * sorted by clustering columns, we use a PriorityQueue to merge them
efficiently without
- * re-sorting. Key index entries are deleted during reading and rebuilt
after writing.
- *
- * <p>When the number of input files exceeds spillThreshold, smaller files
are spilled to
- * row-based temp files first. Row-based iterators consume much less
memory than columnar file
- * readers.
- */
- private List<DataFileMeta> mergeAndRewriteFiles(
- List<DataFileMeta> inputFiles, RowCompactedSerializer
keySerializer) throws Exception {
- InternalRowSerializer keyRowSerializer = new
InternalRowSerializer(keyType);
- InternalRowSerializer valueRowSerializer = new
InternalRowSerializer(valueType);
-
- // Delete key index entries for all input files before reading
- for (DataFileMeta file : inputFiles) {
- deleteKeyIndexForFile(keySerializer, file);
- }
-
- // Determine which files to spill to row-based temp files
- List<DataFileMeta> filesToSpill = new ArrayList<>();
- List<DataFileMeta> filesToKeep = new ArrayList<>();
- if (inputFiles.size() > spillThreshold) {
- List<DataFileMeta> sortedBySize = new ArrayList<>(inputFiles);
-
sortedBySize.sort(Comparator.comparingLong(DataFileMeta::fileSize));
- int spillCount = inputFiles.size() - spillThreshold;
- filesToSpill = new ArrayList<>(sortedBySize.subList(0,
spillCount));
- filesToKeep = new ArrayList<>(sortedBySize.subList(spillCount,
sortedBySize.size()));
- } else {
- filesToKeep = inputFiles;
- }
-
- // Spill smaller files to row-based temp files
- List<SpilledChannel> spilledChannels = new ArrayList<>();
- for (DataFileMeta file : filesToSpill) {
- spilledChannels.add(spillToRowBasedFile(file));
- }
-
- // Open iterators and initialize the min-heap
- List<CloseableIterator<KeyValue>> openIterators = new ArrayList<>();
- PriorityQueue<MergeEntry> minHeap =
- new PriorityQueue<>(
- (a, b) ->
- clusteringComparatorInValue.compare(
- a.currentKeyValue.value(),
b.currentKeyValue.value()));
-
- try {
- // Add iterators for columnar files (kept in memory)
- for (DataFileMeta file : filesToKeep) {
- @SuppressWarnings("resource")
- CloseableIterator<KeyValue> iterator =
-
valueReaderFactory.createRecordReader(file).toCloseableIterator();
- openIterators.add(iterator);
- if (iterator.hasNext()) {
- KeyValue firstKv = iterator.next().copy(keyRowSerializer,
valueRowSerializer);
- minHeap.add(new MergeEntry(firstKv, iterator));
- }
- }
-
- // Add iterators for row-based spilled files (low memory
consumption)
- for (SpilledChannel spilled : spilledChannels) {
- CloseableIterator<KeyValue> iterator =
spilled.createIterator();
- openIterators.add(iterator);
- if (iterator.hasNext()) {
- KeyValue firstKv = iterator.next().copy(keyRowSerializer,
valueRowSerializer);
- minHeap.add(new MergeEntry(firstKv, iterator));
- }
- }
-
- // Multi-way merge: write records in sorted order
- RollingFileWriter<KeyValue, DataFileMeta> writer =
- writerFactory.createRollingClusteringFileWriter();
- try {
- while (!minHeap.isEmpty()) {
- MergeEntry entry = minHeap.poll();
- writer.write(entry.currentKeyValue);
- if (entry.iterator.hasNext()) {
- entry.currentKeyValue =
- entry.iterator.next().copy(keyRowSerializer,
valueRowSerializer);
- minHeap.add(entry);
- }
- }
- } finally {
- writer.close();
- }
-
- // Remove original files and register new sorted files
- List<DataFileMeta> newFiles = writer.result();
- for (DataFileMeta file : inputFiles) {
- fileLevels.removeFile(file);
- }
- for (DataFileMeta newFile : newFiles) {
- fileLevels.addNewFile(newFile);
- }
-
- // Rebuild key index for the new files
- for (DataFileMeta newFile : newFiles) {
- int fileId = fileLevels.getFileIdByName(newFile.fileName());
- int position = 0;
- try (CloseableIterator<InternalRow> keyIterator =
readKeyIterator(newFile)) {
- while (keyIterator.hasNext()) {
- byte[] key =
keySerializer.serializeToBytes(keyIterator.next());
- ByteArrayOutputStream value = new
ByteArrayOutputStream(8);
- encodeInt(value, fileId);
- encodeInt(value, position);
- kvDb.put(key, value.toByteArray());
- position++;
- }
- }
- }
-
- return newFiles;
- } finally {
- for (CloseableIterator<KeyValue> iterator : openIterators) {
- try {
- iterator.close();
- } catch (Exception ignored) {
- }
- }
- }
- }
-
- /**
- * Spill a columnar DataFileMeta to a row-based temp file. Row-based files
consume much less
- * memory when reading compared to columnar files.
- */
- private SpilledChannel spillToRowBasedFile(DataFileMeta file) throws
Exception {
- FileIOChannel.ID channel = ioManager.createChannel();
- KeyValueWithLevelNoReusingSerializer serializer =
- new KeyValueWithLevelNoReusingSerializer(keyType, valueType);
- BlockCompressionFactory compressFactory =
BlockCompressionFactory.create(compression);
- int compressBlock = (int) MemorySize.parse("64 kb").getBytes();
-
- ChannelWithMeta channelWithMeta;
- ChannelWriterOutputView out =
- FileChannelUtil.createOutputView(
- ioManager, channel, compressFactory, compressBlock);
- try (RecordReader<KeyValue> reader =
valueReaderFactory.createRecordReader(file)) {
- RecordIterator<KeyValue> batch;
- KeyValue record;
- while ((batch = reader.readBatch()) != null) {
- while ((record = batch.next()) != null) {
- serializer.serialize(record, out);
- }
- batch.releaseBatch();
- }
- } finally {
- out.close();
- channelWithMeta =
- new ChannelWithMeta(channel, out.getBlockCount(),
out.getWriteBytes());
- }
-
- return new SpilledChannel(channelWithMeta, compressFactory,
compressBlock, serializer);
- }
-
- /** Holds metadata for a spilled row-based temp file. */
- private class SpilledChannel {
- private final ChannelWithMeta channel;
- private final BlockCompressionFactory compressFactory;
- private final int compressBlock;
- private final KeyValueWithLevelNoReusingSerializer serializer;
-
- SpilledChannel(
- ChannelWithMeta channel,
- BlockCompressionFactory compressFactory,
- int compressBlock,
- KeyValueWithLevelNoReusingSerializer serializer) {
- this.channel = channel;
- this.compressFactory = compressFactory;
- this.compressBlock = compressBlock;
- this.serializer = serializer;
- }
-
- CloseableIterator<KeyValue> createIterator() throws IOException {
- ChannelReaderInputView view =
- FileChannelUtil.createInputView(
- ioManager, channel, new ArrayList<>(),
compressFactory, compressBlock);
- BinaryRowSerializer rowSerializer = new
BinaryRowSerializer(serializer.numFields());
- ChannelReaderInputViewIterator iterator =
- new ChannelReaderInputViewIterator(view, null,
rowSerializer);
- return new SpilledChannelIterator(view, iterator, serializer);
- }
- }
-
- /** Iterator that reads KeyValue records from a spilled row-based temp
file. */
- private static class SpilledChannelIterator implements
CloseableIterator<KeyValue> {
- private final ChannelReaderInputView view;
- private final ChannelReaderInputViewIterator iterator;
- private final KeyValueWithLevelNoReusingSerializer serializer;
- private KeyValue next;
-
- SpilledChannelIterator(
- ChannelReaderInputView view,
- ChannelReaderInputViewIterator iterator,
- KeyValueWithLevelNoReusingSerializer serializer) {
- this.view = view;
- this.iterator = iterator;
- this.serializer = serializer;
- }
-
- @Override
- public boolean hasNext() {
- if (next != null) {
- return true;
- }
- try {
- BinaryRow row = iterator.next();
- if (row == null) {
- return false;
- }
- next = serializer.fromRow(row);
- return true;
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public KeyValue next() {
- if (!hasNext()) {
- throw new java.util.NoSuchElementException();
- }
- KeyValue result = next;
- next = null;
- return result;
- }
-
- @Override
- public void close() throws Exception {
- view.getChannel().closeAndDelete();
- }
- }
-
- /** Delete key index entries for the given file from kvDb (only if they
still point to it). */
- private void deleteKeyIndexForFile(RowCompactedSerializer keySerializer,
DataFileMeta file)
- throws Exception {
- int fileId = fileLevels.getFileIdByName(file.fileName());
- try (CloseableIterator<InternalRow> iterator = readKeyIterator(file)) {
- while (iterator.hasNext()) {
- byte[] key = keySerializer.serializeToBytes(iterator.next());
- byte[] value = kvDb.get(key);
- if (value != null) {
- int storedFileId = decodeInt(new
ByteArrayInputStream(value));
- if (storedFileId == fileId) {
- kvDb.delete(key);
- }
- }
- }
- }
- }
-
- /** Entry in the min-heap for multi-way merge, holding the current
KeyValue and its iterator. */
- private static class MergeEntry {
- KeyValue currentKeyValue;
- final CloseableIterator<KeyValue> iterator;
-
- MergeEntry(KeyValue currentKeyValue, CloseableIterator<KeyValue>
iterator) {
- this.currentKeyValue = currentKeyValue;
- this.iterator = iterator;
- }
- }
-
@Override
public Optional<CompactResult> getCompactionResult(boolean blocking)
throws ExecutionException, InterruptedException {
@@ -791,6 +254,6 @@ public class ClusteringCompactManager extends
CompactFutureManager {
@Override
public void close() throws IOException {
- kvDb.close();
+ keyIndex.close();
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManager.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringFileRewriter.java
similarity index 50%
copy from
paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManager.java
copy to
paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringFileRewriter.java
index 8c8806daeb..ece4813764 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringCompactManager.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringFileRewriter.java
@@ -19,22 +19,14 @@
package org.apache.paimon.mergetree.compact.clustering;
import org.apache.paimon.KeyValue;
-import org.apache.paimon.KeyValueFileStore;
import org.apache.paimon.KeyValueSerializer;
-import org.apache.paimon.codegen.CodeGenUtils;
import org.apache.paimon.codegen.RecordComparator;
-import org.apache.paimon.compact.CompactDeletionFile;
-import org.apache.paimon.compact.CompactFutureManager;
-import org.apache.paimon.compact.CompactResult;
-import org.apache.paimon.compact.CompactTask;
import org.apache.paimon.compression.BlockCompressionFactory;
import org.apache.paimon.compression.CompressOptions;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.BinaryRowSerializer;
import org.apache.paimon.data.serializer.InternalRowSerializer;
-import org.apache.paimon.data.serializer.RowCompactedSerializer;
-import org.apache.paimon.deletionvectors.BucketedDvMaintainer;
import org.apache.paimon.disk.ChannelReaderInputView;
import org.apache.paimon.disk.ChannelReaderInputViewIterator;
import org.apache.paimon.disk.ChannelWithMeta;
@@ -46,11 +38,7 @@ import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.KeyValueFileReaderFactory;
import org.apache.paimon.io.KeyValueFileWriterFactory;
import org.apache.paimon.io.RollingFileWriter;
-import org.apache.paimon.io.cache.CacheManager;
-import org.apache.paimon.lookup.sort.db.SimpleLsmKvDb;
-import org.apache.paimon.operation.metrics.CompactionMetrics;
import org.apache.paimon.options.MemorySize;
-import org.apache.paimon.reader.FileRecordIterator;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.reader.RecordReader.RecordIterator;
import org.apache.paimon.sort.BinaryExternalSortBuffer;
@@ -59,405 +47,74 @@ import org.apache.paimon.utils.CloseableIterator;
import org.apache.paimon.utils.KeyValueWithLevelNoReusingSerializer;
import org.apache.paimon.utils.MutableObjectIterator;
-import javax.annotation.Nullable;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.Comparator;
import java.util.List;
-import java.util.Optional;
import java.util.PriorityQueue;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.stream.IntStream;
-
-import static java.util.Collections.singletonList;
-import static org.apache.paimon.utils.VarLengthIntUtils.decodeInt;
-import static org.apache.paimon.utils.VarLengthIntUtils.encodeInt;
/**
- * Key Value clustering compact manager for {@link KeyValueFileStore}.
- *
- * <p>Compaction is triggered when unsorted files exist. The compaction
process has two phases:
- *
- * <ol>
- * <li><b>Phase 1</b>: Sort and rewrite all unsorted (level 0) files by
clustering columns.
- * <li><b>Phase 2</b>: Merge sorted files based on clustering column key
range overlap. Files are
- * grouped into sections where each section contains overlapping files.
Adjacent sections are
- * merged when beneficial (overlapping files or small sections) to
reduce IO amplification
- * while consolidating small files.
- * </ol>
+ * Handles file rewriting for clustering compaction, including sorting
unsorted files (Phase 1) and
+ * merging sorted files via multi-way merge (Phase 2).
*/
-public class ClusteringCompactManager extends CompactFutureManager {
+public class ClusteringFileRewriter {
private final RowType keyType;
private final RowType valueType;
- private final long sortSpillBufferSize;
- private final int pageSize;
- private final int maxNumFileHandles;
- private final int spillThreshold;
- private final CompressOptions compression;
private final int[] clusteringColumns;
private final RecordComparator clusteringComparatorAlone;
private final RecordComparator clusteringComparatorInValue;
private final IOManager ioManager;
- private final KeyValueFileReaderFactory keyReaderFactory;
private final KeyValueFileReaderFactory valueReaderFactory;
private final KeyValueFileWriterFactory writerFactory;
- private final ExecutorService executor;
- private final BucketedDvMaintainer dvMaintainer;
- private final SimpleLsmKvDb kvDb;
- private final boolean lazyGenDeletionFile;
- private final boolean firstRow;
- @Nullable private final CompactionMetrics.Reporter metricsReporter;
-
private final ClusteringFiles fileLevels;
private final long targetFileSize;
+ private final long sortSpillBufferSize;
+ private final int pageSize;
+ private final int maxNumFileHandles;
+ private final int spillThreshold;
+ private final CompressOptions compression;
- public ClusteringCompactManager(
+ public ClusteringFileRewriter(
RowType keyType,
RowType valueType,
- List<String> clusteringColumns,
+ int[] clusteringColumns,
+ RecordComparator clusteringComparatorAlone,
+ RecordComparator clusteringComparatorInValue,
IOManager ioManager,
- CacheManager cacheManager,
- KeyValueFileReaderFactory keyReaderFactory,
KeyValueFileReaderFactory valueReaderFactory,
KeyValueFileWriterFactory writerFactory,
- ExecutorService executor,
- BucketedDvMaintainer dvMaintainer,
- boolean lazyGenDeletionFile,
- List<DataFileMeta> restoreFiles,
+ ClusteringFiles fileLevels,
long targetFileSize,
long sortSpillBufferSize,
int pageSize,
int maxNumFileHandles,
int spillThreshold,
- CompressOptions compression,
- boolean firstRow,
- @Nullable CompactionMetrics.Reporter metricsReporter) {
- this.targetFileSize = targetFileSize;
+ CompressOptions compression) {
this.keyType = keyType;
this.valueType = valueType;
+ this.clusteringColumns = clusteringColumns;
+ this.clusteringComparatorAlone = clusteringComparatorAlone;
+ this.clusteringComparatorInValue = clusteringComparatorInValue;
+ this.ioManager = ioManager;
+ this.valueReaderFactory = valueReaderFactory;
+ this.writerFactory = writerFactory;
+ this.fileLevels = fileLevels;
+ this.targetFileSize = targetFileSize;
this.sortSpillBufferSize = sortSpillBufferSize;
this.pageSize = pageSize;
this.maxNumFileHandles = maxNumFileHandles;
this.spillThreshold = spillThreshold;
this.compression = compression;
- this.firstRow = firstRow;
- this.clusteringColumns = valueType.projectIndexes(clusteringColumns);
- this.clusteringComparatorAlone =
- CodeGenUtils.newRecordComparator(
- valueType.project(clusteringColumns).getFieldTypes(),
- IntStream.range(0, clusteringColumns.size()).toArray(),
- true);
- this.clusteringComparatorInValue =
- CodeGenUtils.newRecordComparator(
- valueType.getFieldTypes(), this.clusteringColumns,
true);
- this.ioManager = ioManager;
- this.keyReaderFactory = keyReaderFactory;
- this.valueReaderFactory = valueReaderFactory;
- this.writerFactory = writerFactory;
- this.executor = executor;
- this.dvMaintainer = dvMaintainer;
- this.lazyGenDeletionFile = lazyGenDeletionFile;
- this.metricsReporter = metricsReporter;
- this.fileLevels = new ClusteringFiles();
- restoreFiles.forEach(this::addNewFile);
-
- this.kvDb =
- SimpleLsmKvDb.builder(new File(ioManager.pickRandomTempDir()))
- .cacheManager(cacheManager)
- .keyComparator(new
RowCompactedSerializer(keyType).createSliceComparator())
- .build();
- bootstrapKeyIndex(restoreFiles);
- }
-
- private void bootstrapKeyIndex(List<DataFileMeta> restoreFiles) {
- RowCompactedSerializer keySerializer = new
RowCompactedSerializer(keyType);
- for (DataFileMeta file : restoreFiles) {
- if (file.level() == 0) {
- continue;
- }
- int fileId = fileLevels.getFileIdByName(file.fileName());
- // Read with DV (auto-skips deleted rows). Use
FileRecordIterator.returnedPosition()
- // to get correct physical positions even after DV filtering.
- try (RecordReader<KeyValue> reader =
keyReaderFactory.createRecordReader(file)) {
- FileRecordIterator<KeyValue> batch;
- while ((batch = (FileRecordIterator<KeyValue>)
reader.readBatch()) != null) {
- KeyValue kv;
- while ((kv = batch.next()) != null) {
- int position = (int) batch.returnedPosition();
- byte[] keyBytes =
keySerializer.serializeToBytes(kv.key());
- ByteArrayOutputStream value = new
ByteArrayOutputStream(8);
- encodeInt(value, fileId);
- encodeInt(value, position);
- kvDb.put(keyBytes, value.toByteArray());
- }
- batch.releaseBatch();
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- }
-
- private CloseableIterator<InternalRow> readKeyIterator(DataFileMeta file)
throws IOException {
- //noinspection resource
- return keyReaderFactory
- .createRecordReader(file)
- .transform(KeyValue::key)
- .toCloseableIterator();
- }
-
- @Override
- public boolean shouldWaitForLatestCompaction() {
- return false;
- }
-
- @Override
- public boolean shouldWaitForPreparingCheckpoint() {
- return false;
- }
-
- @Override
- public void addNewFile(DataFileMeta file) {
- fileLevels.addNewFile(file);
- }
-
- @Override
- public List<DataFileMeta> allFiles() {
- return fileLevels.allFiles();
- }
-
- @Override
- public void triggerCompaction(boolean fullCompaction) {
- taskFuture =
- executor.submit(
- new CompactTask(metricsReporter) {
- @Override
- protected CompactResult doCompact() throws
Exception {
- return compact(fullCompaction);
- }
- });
- }
-
- private CompactResult compact(boolean fullCompaction) throws Exception {
- RowCompactedSerializer keySerializer = new
RowCompactedSerializer(keyType);
- KeyValueSerializer kvSerializer = new KeyValueSerializer(keyType,
valueType);
- RowType kvSchemaType = KeyValue.schema(keyType, valueType);
-
- CompactResult result = new CompactResult();
-
- // Phase 1: Sort and rewrite all unsorted (level 0) files
- List<DataFileMeta> unsortedFiles = fileLevels.unsortedFiles();
- // Snapshot sorted files before Phase 1 to avoid including newly
created files in Phase 2
- List<DataFileMeta> existingSortedFiles = fileLevels.sortedFiles();
- for (DataFileMeta file : unsortedFiles) {
- List<DataFileMeta> sortedFiles =
- sortAndRewriteFiles(singletonList(file), kvSerializer,
kvSchemaType);
- updateKeyIndex(keySerializer, file, sortedFiles);
- result.before().add(file);
- result.after().addAll(sortedFiles);
- }
-
- // Phase 2: Universal Compaction on sorted files that existed before
Phase 1.
- // Files produced by Phase 1 are excluded to avoid the same file
appearing in both
- // result.before() and result.after().
- List<List<DataFileMeta>> mergeGroups;
- if (fullCompaction) {
- mergeGroups = singletonList(existingSortedFiles);
- } else {
- mergeGroups = pickMergeCandidates(existingSortedFiles);
- }
-
- for (List<DataFileMeta> mergeGroup : mergeGroups) {
- if (mergeGroup.size() >= 2) {
- List<DataFileMeta> mergedFiles =
mergeAndRewriteFiles(mergeGroup, keySerializer);
- result.before().addAll(mergeGroup);
- result.after().addAll(mergedFiles);
- }
- }
-
- CompactDeletionFile deletionFile =
- lazyGenDeletionFile
- ? CompactDeletionFile.lazyGeneration(dvMaintainer)
- : CompactDeletionFile.generateFiles(dvMaintainer);
- result.setDeletionFile(deletionFile);
- return result;
- }
-
- /**
- * Pick merge candidate groups based on clustering column range overlap
and file sizes.
- *
- * <ol>
- * <li><b>Group into sections</b>: Files are sorted by minKey and
grouped into sections based
- * on clustering column key range overlap. Overlapping files belong
to the same section.
- * <li><b>Merge adjacent sections</b>: Sections that have overlapping
files (size >= 2) or
- * are small (total size < targetFileSize/2) are accumulated
together. Large
- * single-file sections act as barriers, flushing accumulated files
into a merge group.
- * </ol>
- *
- * @param sortedFiles all sorted files
- * @return list of merge groups; each group contains files to merge
together
- */
- private List<List<DataFileMeta>> pickMergeCandidates(List<DataFileMeta>
sortedFiles) {
- if (sortedFiles.size() < 2) {
- return java.util.Collections.emptyList();
- }
-
- // Step 1: Group files into sections based on clustering column range
overlap.
- List<List<DataFileMeta>> sections = groupIntoSections(sortedFiles);
-
- // Step 2: Merge adjacent sections when beneficial to reduce small
files.
- // A section should be merged if it has overlapping files (size >= 2)
or is small.
- long smallSectionThreshold = targetFileSize / 2;
- List<List<DataFileMeta>> mergeGroups = new ArrayList<>();
- List<DataFileMeta> pending = new ArrayList<>();
-
- for (List<DataFileMeta> section : sections) {
- boolean needsMerge = section.size() >= 2;
- boolean isSmall = sectionSize(section) < smallSectionThreshold;
-
- if (needsMerge || isSmall) {
- // This section should be merged, accumulate it
- pending.addAll(section);
- } else {
- // This section is a single large file, flush pending if any
- if (pending.size() >= 2) {
- mergeGroups.add(new ArrayList<>(pending));
- }
- pending.clear();
- }
- }
-
- // Flush remaining pending files
- if (pending.size() >= 2) {
- mergeGroups.add(pending);
- }
-
- return mergeGroups;
- }
-
- private long sectionSize(List<DataFileMeta> section) {
- long total = 0;
- for (DataFileMeta file : section) {
- total += file.fileSize();
- }
- return total;
}
/**
- * Group files into sections based on clustering column key range overlap.
Files are first
- * sorted by minKey, then adjacent files with overlapping ranges are
grouped into the same
- * section.
- *
- * @param files input files
- * @return list of sections, each section contains overlapping files
+ * Sort and rewrite unsorted files by clustering columns. Reads all
KeyValue records, sorts them
+ * using an external sort buffer, and writes to new level-1 files.
*/
- private List<List<DataFileMeta>> groupIntoSections(List<DataFileMeta>
files) {
- // Sort files by minKey to properly detect overlapping ranges
- List<DataFileMeta> sorted = new ArrayList<>(files);
- sorted.sort((a, b) -> clusteringComparatorAlone.compare(a.minKey(),
b.minKey()));
-
- List<List<DataFileMeta>> sections = new ArrayList<>();
- List<DataFileMeta> currentSection = new ArrayList<>();
- currentSection.add(sorted.get(0));
- BinaryRow currentMax = sorted.get(0).maxKey();
-
- for (int i = 1; i < sorted.size(); i++) {
- DataFileMeta file = sorted.get(i);
- if (clusteringComparatorAlone.compare(currentMax, file.minKey())
>= 0) {
- // Overlaps with current section
- currentSection.add(file);
- if (clusteringComparatorAlone.compare(file.maxKey(),
currentMax) > 0) {
- currentMax = file.maxKey();
- }
- } else {
- sections.add(currentSection);
- currentSection = new ArrayList<>();
- currentSection.add(file);
- currentMax = file.maxKey();
- }
- }
- sections.add(currentSection);
- return sections;
- }
-
- /**
- * Update the key index for a single original file replaced by new sorted
files. Marks old key
- * positions in deletion vectors and registers new positions.
- */
- private void updateKeyIndex(
- RowCompactedSerializer keySerializer,
- DataFileMeta originalFile,
- List<DataFileMeta> newSortedFiles)
- throws Exception {
- updateKeyIndex(keySerializer, singletonList(originalFile),
newSortedFiles);
- }
-
- /**
- * Update the key index for multiple original files replaced by new sorted
files.
- *
- * <p>For DEDUPLICATE mode: mark the old position in deletion vectors,
keep the new position.
- *
- * <p>For FIRST_ROW mode: if key exists, mark the new position in deletion
vectors (keep the
- * first/old one); if key is new, store the new position.
- */
- private void updateKeyIndex(
- RowCompactedSerializer keySerializer,
- List<DataFileMeta> originalFiles,
- List<DataFileMeta> newSortedFiles)
- throws Exception {
- // Collect file names of original files to avoid self-deletion marking
- java.util.Set<String> originalFileNames = new java.util.HashSet<>();
- for (DataFileMeta file : originalFiles) {
- originalFileNames.add(file.fileName());
- }
-
- for (DataFileMeta sortedFile : newSortedFiles) {
- int fileId = fileLevels.getFileIdByName(sortedFile.fileName());
- int position = 0;
- try (CloseableIterator<InternalRow> iterator =
readKeyIterator(sortedFile)) {
- while (iterator.hasNext()) {
- byte[] key =
keySerializer.serializeToBytes(iterator.next());
- byte[] oldValue = kvDb.get(key);
- if (oldValue != null) {
- ByteArrayInputStream valueIn = new
ByteArrayInputStream(oldValue);
- int oldFileId = decodeInt(valueIn);
- int oldPosition = decodeInt(valueIn);
- DataFileMeta oldFile =
fileLevels.getFileById(oldFileId);
- if (oldFile != null &&
!originalFileNames.contains(oldFile.fileName())) {
- if (firstRow) {
- // First-row mode: keep the old (first)
record, delete the new one
-
dvMaintainer.notifyNewDeletion(sortedFile.fileName(), position);
- position++;
- continue;
- } else {
- // Deduplicate mode: keep the new record,
delete the old one
-
dvMaintainer.notifyNewDeletion(oldFile.fileName(), oldPosition);
- }
- }
- }
- ByteArrayOutputStream value = new ByteArrayOutputStream(8);
- encodeInt(value, fileId);
- encodeInt(value, position);
- kvDb.put(key, value.toByteArray());
- position++;
- }
- }
- }
- }
-
- /**
- * Sort and rewrite one or more unsorted files by clustering columns.
Reads all KeyValue records
- * from the input files, sorts them using an external sort buffer, and
writes to new level-1
- * files.
- */
- private List<DataFileMeta> sortAndRewriteFiles(
+ public List<DataFileMeta> sortAndRewriteFiles(
List<DataFileMeta> inputFiles, KeyValueSerializer kvSerializer,
RowType kvSchemaType)
throws Exception {
int[] sortFieldsInKeyValue =
@@ -517,27 +174,54 @@ public class ClusteringCompactManager extends
CompactFutureManager {
}
/**
- * Merge sorted files using min-heap based multi-way merge. Since all
input files are already
- * sorted by clustering columns, we use a PriorityQueue to merge them
efficiently without
- * re-sorting. Key index entries are deleted during reading and rebuilt
after writing.
+ * Pick merge candidate groups based on clustering column range overlap
and file sizes.
*
- * <p>When the number of input files exceeds spillThreshold, smaller files
are spilled to
- * row-based temp files first. Row-based iterators consume much less
memory than columnar file
- * readers.
+ * @param sortedFiles all sorted files
+ * @return list of merge groups; each group contains files to merge
together
*/
- private List<DataFileMeta> mergeAndRewriteFiles(
- List<DataFileMeta> inputFiles, RowCompactedSerializer
keySerializer) throws Exception {
- InternalRowSerializer keyRowSerializer = new
InternalRowSerializer(keyType);
- InternalRowSerializer valueRowSerializer = new
InternalRowSerializer(valueType);
+ public List<List<DataFileMeta>> pickMergeCandidates(List<DataFileMeta>
sortedFiles) {
+ if (sortedFiles.size() < 2) {
+ return Collections.emptyList();
+ }
- // Delete key index entries for all input files before reading
- for (DataFileMeta file : inputFiles) {
- deleteKeyIndexForFile(keySerializer, file);
+ List<List<DataFileMeta>> sections = groupIntoSections(sortedFiles);
+
+ long smallSectionThreshold = targetFileSize / 2;
+ List<List<DataFileMeta>> mergeGroups = new ArrayList<>();
+ List<DataFileMeta> pending = new ArrayList<>();
+
+ for (List<DataFileMeta> section : sections) {
+ boolean needsMerge = section.size() >= 2;
+ boolean isSmall = sectionSize(section) < smallSectionThreshold;
+
+ if (needsMerge || isSmall) {
+ pending.addAll(section);
+ } else {
+ if (pending.size() >= 2) {
+ mergeGroups.add(new ArrayList<>(pending));
+ }
+ pending.clear();
+ }
}
+ if (pending.size() >= 2) {
+ mergeGroups.add(pending);
+ }
+
+ return mergeGroups;
+ }
+
+ /**
+ * Merge sorted files using min-heap based multi-way merge. Key index
entries are deleted before
+ * reading and rebuilt after writing by the caller.
+ */
+ public List<DataFileMeta> mergeAndRewriteFiles(List<DataFileMeta>
inputFiles) throws Exception {
+ InternalRowSerializer keyRowSerializer = new
InternalRowSerializer(keyType);
+ InternalRowSerializer valueRowSerializer = new
InternalRowSerializer(valueType);
+
// Determine which files to spill to row-based temp files
List<DataFileMeta> filesToSpill = new ArrayList<>();
- List<DataFileMeta> filesToKeep = new ArrayList<>();
+ List<DataFileMeta> filesToKeep;
if (inputFiles.size() > spillThreshold) {
List<DataFileMeta> sortedBySize = new ArrayList<>(inputFiles);
sortedBySize.sort(Comparator.comparingLong(DataFileMeta::fileSize));
@@ -563,7 +247,6 @@ public class ClusteringCompactManager extends
CompactFutureManager {
a.currentKeyValue.value(),
b.currentKeyValue.value()));
try {
- // Add iterators for columnar files (kept in memory)
for (DataFileMeta file : filesToKeep) {
@SuppressWarnings("resource")
CloseableIterator<KeyValue> iterator =
@@ -575,7 +258,6 @@ public class ClusteringCompactManager extends
CompactFutureManager {
}
}
- // Add iterators for row-based spilled files (low memory
consumption)
for (SpilledChannel spilled : spilledChannels) {
CloseableIterator<KeyValue> iterator =
spilled.createIterator();
openIterators.add(iterator);
@@ -585,7 +267,6 @@ public class ClusteringCompactManager extends
CompactFutureManager {
}
}
- // Multi-way merge: write records in sorted order
RollingFileWriter<KeyValue, DataFileMeta> writer =
writerFactory.createRollingClusteringFileWriter();
try {
@@ -602,7 +283,6 @@ public class ClusteringCompactManager extends
CompactFutureManager {
writer.close();
}
- // Remove original files and register new sorted files
List<DataFileMeta> newFiles = writer.result();
for (DataFileMeta file : inputFiles) {
fileLevels.removeFile(file);
@@ -611,22 +291,6 @@ public class ClusteringCompactManager extends
CompactFutureManager {
fileLevels.addNewFile(newFile);
}
- // Rebuild key index for the new files
- for (DataFileMeta newFile : newFiles) {
- int fileId = fileLevels.getFileIdByName(newFile.fileName());
- int position = 0;
- try (CloseableIterator<InternalRow> keyIterator =
readKeyIterator(newFile)) {
- while (keyIterator.hasNext()) {
- byte[] key =
keySerializer.serializeToBytes(keyIterator.next());
- ByteArrayOutputStream value = new
ByteArrayOutputStream(8);
- encodeInt(value, fileId);
- encodeInt(value, position);
- kvDb.put(key, value.toByteArray());
- position++;
- }
- }
- }
-
return newFiles;
} finally {
for (CloseableIterator<KeyValue> iterator : openIterators) {
@@ -638,10 +302,41 @@ public class ClusteringCompactManager extends
CompactFutureManager {
}
}
- /**
- * Spill a columnar DataFileMeta to a row-based temp file. Row-based files
consume much less
- * memory when reading compared to columnar files.
- */
+ private List<List<DataFileMeta>> groupIntoSections(List<DataFileMeta>
files) {
+ List<DataFileMeta> sorted = new ArrayList<>(files);
+ sorted.sort((a, b) -> clusteringComparatorAlone.compare(a.minKey(),
b.minKey()));
+
+ List<List<DataFileMeta>> sections = new ArrayList<>();
+ List<DataFileMeta> currentSection = new ArrayList<>();
+ currentSection.add(sorted.get(0));
+ BinaryRow currentMax = sorted.get(0).maxKey();
+
+ for (int i = 1; i < sorted.size(); i++) {
+ DataFileMeta file = sorted.get(i);
+ if (clusteringComparatorAlone.compare(currentMax, file.minKey())
>= 0) {
+ currentSection.add(file);
+ if (clusteringComparatorAlone.compare(file.maxKey(),
currentMax) > 0) {
+ currentMax = file.maxKey();
+ }
+ } else {
+ sections.add(currentSection);
+ currentSection = new ArrayList<>();
+ currentSection.add(file);
+ currentMax = file.maxKey();
+ }
+ }
+ sections.add(currentSection);
+ return sections;
+ }
+
+ private long sectionSize(List<DataFileMeta> section) {
+ long total = 0;
+ for (DataFileMeta file : section) {
+ total += file.fileSize();
+ }
+ return total;
+ }
+
private SpilledChannel spillToRowBasedFile(DataFileMeta file) throws
Exception {
FileIOChannel.ID channel = ioManager.createChannel();
KeyValueWithLevelNoReusingSerializer serializer =
@@ -749,25 +444,7 @@ public class ClusteringCompactManager extends
CompactFutureManager {
}
}
- /** Delete key index entries for the given file from kvDb (only if they
still point to it). */
- private void deleteKeyIndexForFile(RowCompactedSerializer keySerializer,
DataFileMeta file)
- throws Exception {
- int fileId = fileLevels.getFileIdByName(file.fileName());
- try (CloseableIterator<InternalRow> iterator = readKeyIterator(file)) {
- while (iterator.hasNext()) {
- byte[] key = keySerializer.serializeToBytes(iterator.next());
- byte[] value = kvDb.get(key);
- if (value != null) {
- int storedFileId = decodeInt(new
ByteArrayInputStream(value));
- if (storedFileId == fileId) {
- kvDb.delete(key);
- }
- }
- }
- }
- }
-
- /** Entry in the min-heap for multi-way merge, holding the current
KeyValue and its iterator. */
+ /** Entry in the min-heap for multi-way merge. */
private static class MergeEntry {
KeyValue currentKeyValue;
final CloseableIterator<KeyValue> iterator;
@@ -777,20 +454,4 @@ public class ClusteringCompactManager extends
CompactFutureManager {
this.iterator = iterator;
}
}
-
- @Override
- public Optional<CompactResult> getCompactionResult(boolean blocking)
- throws ExecutionException, InterruptedException {
- return innerGetCompactionResult(blocking);
- }
-
- @Override
- public boolean compactNotCompleted() {
- return super.compactNotCompleted() || fileLevels.compactNotCompleted();
- }
-
- @Override
- public void close() throws IOException {
- kvDb.close();
- }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringKeyIndex.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringKeyIndex.java
new file mode 100644
index 0000000000..d7234345f6
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/clustering/ClusteringKeyIndex.java
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.mergetree.compact.clustering;
+
+import org.apache.paimon.KeyValue;
+import org.apache.paimon.compression.CompressOptions;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.serializer.RowCompactedSerializer;
+import org.apache.paimon.deletionvectors.BucketedDvMaintainer;
+import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.KeyValueFileReaderFactory;
+import org.apache.paimon.lookup.sort.db.SimpleLsmKvDb;
+import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.reader.FileRecordIterator;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.sort.BinaryExternalSortBuffer;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.VarBinaryType;
+import org.apache.paimon.utils.CloseableIterator;
+import org.apache.paimon.utils.MutableObjectIterator;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.IntStream;
+
+import static org.apache.paimon.utils.VarLengthIntUtils.decodeInt;
+import static org.apache.paimon.utils.VarLengthIntUtils.encodeInt;
+
+/**
+ * Manages the primary key index for clustering compaction. Maps each primary
key to its file
+ * location (fileId + row position) using a {@link SimpleLsmKvDb}.
+ */
+public class ClusteringKeyIndex implements Closeable {
+
+ private final RowType keyType;
+ private final IOManager ioManager;
+ private final KeyValueFileReaderFactory keyReaderFactory;
+ private final BucketedDvMaintainer dvMaintainer;
+ private final SimpleLsmKvDb kvDb;
+ private final ClusteringFiles fileLevels;
+ private final boolean firstRow;
+ private final long sortSpillBufferSize;
+ private final int pageSize;
+ private final int maxNumFileHandles;
+ private final CompressOptions compression;
+
+ public ClusteringKeyIndex(
+ RowType keyType,
+ IOManager ioManager,
+ KeyValueFileReaderFactory keyReaderFactory,
+ BucketedDvMaintainer dvMaintainer,
+ SimpleLsmKvDb kvDb,
+ ClusteringFiles fileLevels,
+ boolean firstRow,
+ long sortSpillBufferSize,
+ int pageSize,
+ int maxNumFileHandles,
+ CompressOptions compression) {
+ this.keyType = keyType;
+ this.ioManager = ioManager;
+ this.keyReaderFactory = keyReaderFactory;
+ this.dvMaintainer = dvMaintainer;
+ this.kvDb = kvDb;
+ this.fileLevels = fileLevels;
+ this.firstRow = firstRow;
+ this.sortSpillBufferSize = sortSpillBufferSize;
+ this.pageSize = pageSize;
+ this.maxNumFileHandles = maxNumFileHandles;
+ this.compression = compression;
+ }
+
+ /** Bootstrap the key index from existing sorted files using external sort
+ bulk load. */
+ public void bootstrap(List<DataFileMeta> restoreFiles) {
+ List<DataField> combinedFields = new ArrayList<>();
+ List<DataField> keyFields = keyType.getFields();
+ for (int i = 0; i < keyFields.size(); i++) {
+ DataField kf = keyFields.get(i);
+ combinedFields.add(new DataField(i, kf.name(), kf.type()));
+ }
+ int valueFieldIndex = keyFields.size();
+ combinedFields.add(
+ new DataField(
+ valueFieldIndex, "_value_bytes", new
VarBinaryType(Integer.MAX_VALUE)));
+ RowType combinedType = new RowType(combinedFields);
+
+ int[] sortFields = IntStream.range(0,
keyType.getFieldCount()).toArray();
+ BinaryExternalSortBuffer sortBuffer =
+ BinaryExternalSortBuffer.create(
+ ioManager,
+ combinedType,
+ sortFields,
+ sortSpillBufferSize,
+ pageSize,
+ maxNumFileHandles,
+ compression,
+ MemorySize.MAX_VALUE,
+ false);
+
+ RowCompactedSerializer keySerializer = new
RowCompactedSerializer(keyType);
+ InternalRow.FieldGetter[] keyFieldGetters =
+ new InternalRow.FieldGetter[keyType.getFieldCount()];
+ for (int i = 0; i < keyType.getFieldCount(); i++) {
+ keyFieldGetters[i] =
InternalRow.createFieldGetter(keyType.getTypeAt(i), i);
+ }
+ try {
+ for (DataFileMeta file : restoreFiles) {
+ if (file.level() == 0) {
+ continue;
+ }
+ int fileId = fileLevels.getFileIdByName(file.fileName());
+ try (RecordReader<KeyValue> reader =
keyReaderFactory.createRecordReader(file)) {
+ FileRecordIterator<KeyValue> batch;
+ while ((batch = (FileRecordIterator<KeyValue>)
reader.readBatch()) != null) {
+ KeyValue kv;
+ while ((kv = batch.next()) != null) {
+ int position = (int) batch.returnedPosition();
+ ByteArrayOutputStream valueOut = new
ByteArrayOutputStream(8);
+ encodeInt(valueOut, fileId);
+ encodeInt(valueOut, position);
+ byte[] valueBytes = valueOut.toByteArray();
+
+ GenericRow combinedRow = new
GenericRow(combinedType.getFieldCount());
+ for (int i = 0; i < keyType.getFieldCount(); i++) {
+ combinedRow.setField(
+ i,
keyFieldGetters[i].getFieldOrNull(kv.key()));
+ }
+ combinedRow.setField(valueFieldIndex, valueBytes);
+ sortBuffer.write(combinedRow);
+ }
+ batch.releaseBatch();
+ }
+ }
+ }
+
+ MutableObjectIterator<BinaryRow> sortedIterator =
sortBuffer.sortedIterator();
+ BinaryRow binaryRow = new BinaryRow(combinedType.getFieldCount());
+ InternalRow.FieldGetter valueGetter =
+ InternalRow.createFieldGetter(
+ new VarBinaryType(Integer.MAX_VALUE),
valueFieldIndex);
+
+ Iterator<Map.Entry<byte[], byte[]>> entryIterator =
+ new Iterator<Map.Entry<byte[], byte[]>>() {
+ private BinaryRow current = binaryRow;
+ private boolean hasNext;
+
+ {
+ advance();
+ }
+
+ private void advance() {
+ try {
+ current = sortedIterator.next(current);
+ hasNext = current != null;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ return hasNext;
+ }
+
+ @Override
+ public Map.Entry<byte[], byte[]> next() {
+ byte[] key =
keySerializer.serializeToBytes(current);
+ byte[] value = (byte[])
valueGetter.getFieldOrNull(current);
+ advance();
+ return new AbstractMap.SimpleImmutableEntry<>(key,
value);
+ }
+ };
+
+ kvDb.bulkLoad(entryIterator);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ } finally {
+ sortBuffer.clear();
+ }
+ }
+
+ /**
+ * Update the key index after a single original file is replaced by new
sorted files.
+ *
+ * <p>For DEDUPLICATE mode: mark the old position in deletion vectors,
keep the new position.
+ *
+ * <p>For FIRST_ROW mode: if key exists, mark the new position in deletion
vectors (keep the
+ * first/old one); if key is new, store the new position.
+ */
+ public void updateIndex(DataFileMeta originalFile, List<DataFileMeta>
newSortedFiles)
+ throws Exception {
+ updateIndex(Collections.singletonList(originalFile), newSortedFiles);
+ }
+
+ /**
+ * Update the key index after multiple original files are replaced by new
sorted files.
+ *
+ * @see #updateIndex(DataFileMeta, List)
+ */
+ public void updateIndex(List<DataFileMeta> originalFiles,
List<DataFileMeta> newSortedFiles)
+ throws Exception {
+ RowCompactedSerializer keySerializer = new
RowCompactedSerializer(keyType);
+
+ Set<String> originalFileNames = new HashSet<>();
+ for (DataFileMeta file : originalFiles) {
+ originalFileNames.add(file.fileName());
+ }
+
+ for (DataFileMeta sortedFile : newSortedFiles) {
+ int fileId = fileLevels.getFileIdByName(sortedFile.fileName());
+ int position = 0;
+ try (CloseableIterator<InternalRow> iterator =
readKeyIterator(sortedFile)) {
+ while (iterator.hasNext()) {
+ byte[] key =
keySerializer.serializeToBytes(iterator.next());
+ byte[] oldValue = kvDb.get(key);
+ if (oldValue != null) {
+ ByteArrayInputStream valueIn = new
ByteArrayInputStream(oldValue);
+ int oldFileId = decodeInt(valueIn);
+ int oldPosition = decodeInt(valueIn);
+ DataFileMeta oldFile =
fileLevels.getFileById(oldFileId);
+ if (oldFile != null &&
!originalFileNames.contains(oldFile.fileName())) {
+ if (firstRow) {
+
dvMaintainer.notifyNewDeletion(sortedFile.fileName(), position);
+ position++;
+ continue;
+ } else {
+
dvMaintainer.notifyNewDeletion(oldFile.fileName(), oldPosition);
+ }
+ }
+ }
+ ByteArrayOutputStream value = new ByteArrayOutputStream(8);
+ encodeInt(value, fileId);
+ encodeInt(value, position);
+ kvDb.put(key, value.toByteArray());
+ position++;
+ }
+ }
+ }
+ }
+
+ /** Delete key index entries for the given file (only if they still point
to it). */
+ public void deleteIndex(DataFileMeta file) throws Exception {
+ RowCompactedSerializer keySerializer = new
RowCompactedSerializer(keyType);
+ int fileId = fileLevels.getFileIdByName(file.fileName());
+ try (CloseableIterator<InternalRow> iterator = readKeyIterator(file)) {
+ while (iterator.hasNext()) {
+ byte[] key = keySerializer.serializeToBytes(iterator.next());
+ byte[] value = kvDb.get(key);
+ if (value != null) {
+ int storedFileId = decodeInt(new
ByteArrayInputStream(value));
+ if (storedFileId == fileId) {
+ kvDb.delete(key);
+ }
+ }
+ }
+ }
+ }
+
+ /** Rebuild key index entries for a newly written file. */
+ public void rebuildIndex(DataFileMeta newFile) throws Exception {
+ RowCompactedSerializer keySerializer = new
RowCompactedSerializer(keyType);
+ int fileId = fileLevels.getFileIdByName(newFile.fileName());
+ int position = 0;
+ try (CloseableIterator<InternalRow> keyIterator =
readKeyIterator(newFile)) {
+ while (keyIterator.hasNext()) {
+ byte[] key =
keySerializer.serializeToBytes(keyIterator.next());
+ ByteArrayOutputStream value = new ByteArrayOutputStream(8);
+ encodeInt(value, fileId);
+ encodeInt(value, position);
+ kvDb.put(key, value.toByteArray());
+ position++;
+ }
+ }
+ }
+
+ private CloseableIterator<InternalRow> readKeyIterator(DataFileMeta file)
throws IOException {
+ //noinspection resource
+ return keyReaderFactory
+ .createRecordReader(file)
+ .transform(KeyValue::key)
+ .toCloseableIterator();
+ }
+
+ @Override
+ public void close() throws IOException {
+ kvDb.close();
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/separated/ClusteringTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/separated/ClusteringTableTest.java
index f69f5fc6ae..aaf661be2f 100644
---
a/paimon-core/src/test/java/org/apache/paimon/separated/ClusteringTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/separated/ClusteringTableTest.java
@@ -336,6 +336,56 @@ class ClusteringTableTest {
.containsExactlyInAnyOrder(GenericRow.of(1, 50),
GenericRow.of(2, 60));
}
+ /**
+ * Test that bootstrap correctly rebuilds the key index via bulkLoad from
existing sorted files.
+ *
+ * <p>Each writeRows() call creates a new writer (and thus a new
ClusteringCompactManager),
+ * which calls {@code keyIndex.bootstrap(restoreFiles)}. The bootstrap
method reads all level >
+ * 0 files, sorts them externally, and bulk-loads into the LSM KV DB —
bypassing the normal
+ * put-per-entry path. This test verifies that the bulkLoad-based index is
correct by checking
+ * deduplication across multiple commits with overlapping keys.
+ */
+ @Test
+ public void testBootstrapBulkLoadIndex() throws Exception {
+ // Commit 1: write initial data → compaction produces level > 0 sorted
files
+ writeRows(
+ Arrays.asList(
+ GenericRow.of(1, 10),
+ GenericRow.of(2, 20),
+ GenericRow.of(3, 30),
+ GenericRow.of(4, 40),
+ GenericRow.of(5, 50)));
+
+ // Commit 2: new writer bootstraps index from level > 0 files via
bulkLoad,
+ // then writes overlapping keys — updateIndex must find existing
entries in the
+ // bulkLoaded index to generate correct deletion vectors
+ writeRows(
+ Arrays.asList(GenericRow.of(1, 100), GenericRow.of(3, 300),
GenericRow.of(5, 500)));
+
+ // Verify dedup: keys 1,3,5 updated; keys 2,4 unchanged
+ assertThat(readRows())
+ .containsExactlyInAnyOrder(
+ GenericRow.of(1, 100),
+ GenericRow.of(2, 20),
+ GenericRow.of(3, 300),
+ GenericRow.of(4, 40),
+ GenericRow.of(5, 500));
+
+ // Commit 3: another bootstrap from the updated sorted files,
+ // verifies bulkLoad works correctly after files have been rewritten
+ writeRows(
+ Arrays.asList(GenericRow.of(2, 200), GenericRow.of(4, 400),
GenericRow.of(6, 600)));
+
+ assertThat(readRows())
+ .containsExactlyInAnyOrder(
+ GenericRow.of(1, 100),
+ GenericRow.of(2, 200),
+ GenericRow.of(3, 300),
+ GenericRow.of(4, 400),
+ GenericRow.of(5, 500),
+ GenericRow.of(6, 600));
+ }
+
// ==================== Clustering Column Filter Tests ====================
/** Test that equality filter on clustering column skips irrelevant files
in the scan plan. */