This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 1753038e81 [core] keep delete record for chain table when perform
compact (#7057)
1753038e81 is described below
commit 1753038e81015759c70a0758ad137bd1de813319
Author: Stefanietry <[email protected]>
AuthorDate: Sat Jan 17 20:41:29 2026 +0800
[core] keep delete record for chain table when perform compact (#7057)
---
.../mergetree/compact/MergeTreeCompactManager.java | 9 +-
.../paimon/operation/KeyValueFileStoreWrite.java | 3 +-
.../apache/paimon/mergetree/MergeTreeTestBase.java | 2 +
.../compact/MergeTreeCompactManagerTest.java | 265 +++++++++++++++++++++
4 files changed, 276 insertions(+), 3 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
index f6ea3e137d..6160f74e2f 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
@@ -71,6 +71,8 @@ public class MergeTreeCompactManager extends
CompactFutureManager {
@Nullable private final RecordLevelExpire recordLevelExpire;
+ private final boolean keepDelete;
+
public MergeTreeCompactManager(
ExecutorService executor,
Levels levels,
@@ -84,7 +86,8 @@ public class MergeTreeCompactManager extends
CompactFutureManager {
boolean lazyGenDeletionFile,
boolean needLookup,
@Nullable RecordLevelExpire recordLevelExpire,
- boolean forceRewriteAllFiles) {
+ boolean forceRewriteAllFiles,
+ boolean keepDelete) {
this.executor = executor;
this.levels = levels;
this.strategy = strategy;
@@ -98,6 +101,7 @@ public class MergeTreeCompactManager extends
CompactFutureManager {
this.recordLevelExpire = recordLevelExpire;
this.needLookup = needLookup;
this.forceRewriteAllFiles = forceRewriteAllFiles;
+ this.keepDelete = keepDelete;
MetricUtils.safeCall(this::reportMetrics, LOG);
}
@@ -174,7 +178,8 @@ public class MergeTreeCompactManager extends
CompactFutureManager {
* See CompactStrategy.pick.
*/
boolean dropDelete =
- unit.outputLevel() != 0
+ !keepDelete
+ && unit.outputLevel() != 0
&& (unit.outputLevel() >=
levels.nonEmptyHighestLevel()
|| dvMaintainer != null);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index 177f093583..326158c7b7 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -308,7 +308,8 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
options.prepareCommitWaitCompaction(),
options.needLookup(),
recordLevelExpire,
- options.forceRewriteAllFiles());
+ options.forceRewriteAllFiles(),
+ options.isChainTable());
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
index 11d3ede213..9769e59e17 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
@@ -455,6 +455,7 @@ public abstract class MergeTreeTestBase {
false,
options.needLookup(),
null,
+ false,
false);
}
@@ -480,6 +481,7 @@ public abstract class MergeTreeTestBase {
false,
false,
null,
+ false,
false);
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerTest.java
index 4adff94778..07406baac5 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerTest.java
@@ -18,24 +18,54 @@
package org.apache.paimon.mergetree.compact;
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.KeyValue;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.TestFileStore;
+import org.apache.paimon.TestKeyValueGenerator;
import org.apache.paimon.compact.CompactResult;
import org.apache.paimon.compact.CompactUnit;
import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.deletionvectors.DeletionVector;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFileTestUtils;
+import org.apache.paimon.io.FileReaderFactory;
+import org.apache.paimon.io.KeyValueFileReaderFactory;
+import org.apache.paimon.io.KeyValueFileWriterFactory;
+import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.mergetree.Levels;
+import org.apache.paimon.mergetree.MergeSorter;
import org.apache.paimon.mergetree.SortedRun;
+import org.apache.paimon.operation.FileStoreScan;
+import org.apache.paimon.operation.SplitRead;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.reader.RecordReaderIterator;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.SchemaUtils;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.types.RowKind;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
@@ -43,6 +73,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
+import static org.apache.paimon.format.FileFormat.fileFormat;
import static org.apache.paimon.io.DataFileTestUtils.newFile;
import static org.assertj.core.api.Assertions.assertThat;
@@ -53,6 +84,8 @@ public class MergeTreeCompactManagerTest {
private static ExecutorService service;
+ @TempDir java.nio.file.Path tempDir;
+
@BeforeAll
public static void before() {
service = Executors.newSingleThreadExecutor();
@@ -209,6 +242,7 @@ public class MergeTreeCompactManagerTest {
false,
true,
null,
+ false,
false);
MergeTreeCompactManager defaultManager =
@@ -225,12 +259,242 @@ public class MergeTreeCompactManagerTest {
false,
false,
null,
+ false,
false);
assertThat(lookupManager.compactNotCompleted()).isTrue();
assertThat(defaultManager.compactNotCompleted()).isFalse();
}
+ @Test
+ public void testCompactWithKeepDelete() throws Exception {
+ // 1) Build a minimal TestFileStore with primary key and one bucket.
+ TableSchema tableSchema =
+ SchemaUtils.forceCommit(
+ new SchemaManager(new LocalFileIO(), new
Path(tempDir.toUri())),
+ new Schema(
+
TestKeyValueGenerator.DEFAULT_ROW_TYPE.getFields(),
+
TestKeyValueGenerator.DEFAULT_PART_TYPE.getFieldNames(),
+ TestKeyValueGenerator.getPrimaryKeys(
+
TestKeyValueGenerator.GeneratorMode.MULTI_PARTITIONED),
+ Collections.emptyMap(),
+ null));
+ TestFileStore store =
+ new TestFileStore.Builder(
+ "avro",
+ tempDir.toString(),
+ 1,
+ TestKeyValueGenerator.DEFAULT_PART_TYPE,
+ TestKeyValueGenerator.KEY_TYPE,
+ TestKeyValueGenerator.DEFAULT_ROW_TYPE,
+
TestKeyValueGenerator.TestKeyValueFieldsExtractor.EXTRACTOR,
+ DeduplicateMergeFunction.factory(),
+ tableSchema)
+ .build();
+
+ // 2) Write INSERT + DELETE for the same key into level-0 files.
+ int shopId = 1;
+ long orderId = 42L;
+ String dt = "20240101";
+ int hr = 0;
+
+ GenericRow keyRow = GenericRow.of(shopId, orderId);
+ GenericRow fullRow =
+ GenericRow.of(
+ BinaryString.fromString(dt),
+ hr,
+ shopId,
+ orderId,
+ 100L,
+ null,
+ BinaryString.fromString("comment"));
+
+ BinaryRow keyBinaryRow =
TestKeyValueGenerator.KEY_SERIALIZER.toBinaryRow(keyRow).copy();
+ BinaryRow valueBinaryRow =
+
TestKeyValueGenerator.DEFAULT_ROW_SERIALIZER.toBinaryRow(fullRow).copy();
+
+ InternalRowSerializer partitionSerializer =
+ new
InternalRowSerializer(TestKeyValueGenerator.DEFAULT_PART_TYPE);
+ BinaryRow partition =
+ partitionSerializer
+
.toBinaryRow(GenericRow.of(BinaryString.fromString(dt), hr))
+ .copy();
+
+ KeyValue insert = new KeyValue().replace(keyBinaryRow, 0L,
RowKind.INSERT, valueBinaryRow);
+ KeyValue delete = new KeyValue().replace(keyBinaryRow, 1L,
RowKind.DELETE, valueBinaryRow);
+
+ List<KeyValue> kvs = Arrays.asList(insert, delete);
+
+ List<Snapshot> snapshots =
+ store.commitData(kvs, kv -> partition, kv -> 0 /* single
bucket */);
+ Snapshot snapshot = snapshots.get(snapshots.size() - 1);
+ long snapshotId = snapshot.id();
+
+ // Collect input files from manifest entries of this snapshot.
+ FileStoreScan scan = store.newScan();
+ List<ManifestEntry> manifestEntries =
scan.withSnapshot(snapshotId).plan().files();
+
+ List<DataFileMeta> inputFiles =
+ manifestEntries.stream()
+ .filter(e -> e.kind() == FileKind.ADD)
+ .map(ManifestEntry::file)
+ .collect(Collectors.toList());
+
+ assertThat(inputFiles).isNotEmpty();
+
+ // 3) Create a MergeTreeCompactManager with keepDelete=true and run
compaction.
+ Comparator<InternalRow> keyComparator =
+ Comparator.<InternalRow>comparingInt(r -> r.getInt(0))
+ .thenComparingLong(r -> r.getLong(1));
+
+ CoreOptions coreOptions = store.options();
+ Levels levels = new Levels(keyComparator, inputFiles,
coreOptions.numLevels());
+
+ // Always compact all runs into the highest level.
+ CompactStrategy strategy =
+ (numLevels, runs) ->
Optional.of(CompactUnit.fromLevelRuns(numLevels - 1, runs));
+
+ KeyValueFileReaderFactory.Builder readerFactoryBuilder =
store.newReaderFactoryBuilder();
+ KeyValueFileReaderFactory keyReaderFactory =
+ readerFactoryBuilder.build(partition, 0,
DeletionVector.emptyFactory());
+ FileReaderFactory<KeyValue> readerFactory = keyReaderFactory;
+
+ KeyValueFileWriterFactory.Builder writerFactoryBuilder =
+ KeyValueFileWriterFactory.builder(
+ store.fileIO(),
+ snapshot.schemaId(),
+ TestKeyValueGenerator.KEY_TYPE,
+ TestKeyValueGenerator.DEFAULT_ROW_TYPE,
+ fileFormat(coreOptions),
+ ignore -> store.pathFactory(),
+ coreOptions.targetFileSize(true));
+ KeyValueFileWriterFactory writerFactory =
+ writerFactoryBuilder.build(partition, 0, coreOptions);
+
+ MergeSorter mergeSorter =
+ new MergeSorter(
+ coreOptions,
+ TestKeyValueGenerator.KEY_TYPE,
+ TestKeyValueGenerator.DEFAULT_ROW_TYPE,
+ null);
+
+ MergeTreeCompactRewriter rewriter =
+ new MergeTreeCompactRewriter(
+ readerFactory,
+ writerFactory,
+ keyComparator,
+ null,
+ DeduplicateMergeFunction.factory(),
+ mergeSorter);
+
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ MergeTreeCompactManager manager =
+ new MergeTreeCompactManager(
+ executor,
+ levels,
+ strategy,
+ keyComparator,
+ coreOptions.compactionFileSize(true),
+ coreOptions.numSortedRunStopTrigger(),
+ rewriter,
+ null,
+ null,
+ false,
+ false,
+ null,
+ false,
+ true); // keepDelete=true
+
+ try {
+ manager.triggerCompaction(false);
+ Optional<CompactResult> resultOptional =
manager.getCompactionResult(true);
+ assertThat(resultOptional).isPresent();
+ CompactResult compactResult = resultOptional.get();
+
+ List<DataFileMeta> compactedFiles = compactResult.after();
+ assertThat(compactedFiles).isNotEmpty();
+
+ int highestLevelBefore =
+
inputFiles.stream().mapToInt(DataFileMeta::level).max().orElse(0);
+ for (DataFileMeta file : compactedFiles) {
+ assertThat(file.level()).isNotZero();
+
assertThat(file.level()).isGreaterThanOrEqualTo(highestLevelBefore);
+ }
+
+ // 4) Read back from compacted files (via manifest entries)
without DropDeleteReader
+ // and assert a DELETE record exists.
+ int totalBuckets = coreOptions.bucket();
+ int bucket = 0;
+ List<ManifestEntry> compactedEntries =
+ compactedFiles.stream()
+ .map(
+ file ->
+ ManifestEntry.create(
+ FileKind.ADD,
+ partition,
+ bucket,
+ totalBuckets,
+ file))
+ .collect(Collectors.toList());
+
+ SplitRead<KeyValue> read = store.newRead().forceKeepDelete();
+
+ Map<BinaryRow, Map<Integer, List<DataFileMeta>>>
filesPerPartitionAndBucket =
+ new HashMap<>();
+ for (ManifestEntry entry : compactedEntries) {
+ filesPerPartitionAndBucket
+ .computeIfAbsent(entry.partition(), p -> new
HashMap<>())
+ .computeIfAbsent(entry.bucket(), b -> new
ArrayList<>())
+ .add(entry.file());
+ }
+
+ List<KeyValue> readBack = new ArrayList<>();
+ for (Map.Entry<BinaryRow, Map<Integer, List<DataFileMeta>>> entry :
+ filesPerPartitionAndBucket.entrySet()) {
+ BinaryRow part = entry.getKey();
+ for (Map.Entry<Integer, List<DataFileMeta>> bucketEntry :
+ entry.getValue().entrySet()) {
+ RecordReader<KeyValue> reader =
+ read.createReader(
+ DataSplit.builder()
+ .withPartition(part)
+ .withBucket(bucketEntry.getKey())
+
.withDataFiles(bucketEntry.getValue())
+ .isStreaming(false)
+ .rawConvertible(false)
+ .withBucketPath("not used")
+ .build());
+ RecordReaderIterator<KeyValue> iterator = new
RecordReaderIterator<>(reader);
+ try {
+ while (iterator.hasNext()) {
+ readBack.add(
+ iterator.next()
+ .copy(
+
TestKeyValueGenerator.KEY_SERIALIZER,
+
TestKeyValueGenerator.DEFAULT_ROW_SERIALIZER));
+ }
+ } finally {
+ iterator.close();
+ }
+ }
+ }
+
+ List<KeyValue> recordsForKey =
+ readBack.stream()
+ .filter(
+ kv ->
+ kv.key().getInt(0) == shopId
+ && kv.key().getLong(1) ==
orderId)
+ .collect(Collectors.toList());
+
+ assertThat(recordsForKey).isNotEmpty();
+ assertThat(recordsForKey).allMatch(kv -> kv.valueKind() ==
RowKind.DELETE);
+ } finally {
+ manager.close();
+ executor.shutdownNow();
+ }
+ }
+
private void innerTest(List<LevelMinMax> inputs, List<LevelMinMax>
expected)
throws ExecutionException, InterruptedException {
innerTest(inputs, expected, testStrategy(), true);
@@ -262,6 +526,7 @@ public class MergeTreeCompactManagerTest {
false,
false,
null,
+ false,
false);
manager.triggerCompaction(false);
manager.getCompactionResult(true);