This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 1753038e81 [core] keep delete record for chain table when perform 
compact (#7057)
1753038e81 is described below

commit 1753038e81015759c70a0758ad137bd1de813319
Author: Stefanietry <[email protected]>
AuthorDate: Sat Jan 17 20:41:29 2026 +0800

    [core] keep delete record for chain table when perform compact (#7057)
---
 .../mergetree/compact/MergeTreeCompactManager.java |   9 +-
 .../paimon/operation/KeyValueFileStoreWrite.java   |   3 +-
 .../apache/paimon/mergetree/MergeTreeTestBase.java |   2 +
 .../compact/MergeTreeCompactManagerTest.java       | 265 +++++++++++++++++++++
 4 files changed, 276 insertions(+), 3 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
index f6ea3e137d..6160f74e2f 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
@@ -71,6 +71,8 @@ public class MergeTreeCompactManager extends 
CompactFutureManager {
 
     @Nullable private final RecordLevelExpire recordLevelExpire;
 
+    private final boolean keepDelete;
+
     public MergeTreeCompactManager(
             ExecutorService executor,
             Levels levels,
@@ -84,7 +86,8 @@ public class MergeTreeCompactManager extends 
CompactFutureManager {
             boolean lazyGenDeletionFile,
             boolean needLookup,
             @Nullable RecordLevelExpire recordLevelExpire,
-            boolean forceRewriteAllFiles) {
+            boolean forceRewriteAllFiles,
+            boolean keepDelete) {
         this.executor = executor;
         this.levels = levels;
         this.strategy = strategy;
@@ -98,6 +101,7 @@ public class MergeTreeCompactManager extends 
CompactFutureManager {
         this.recordLevelExpire = recordLevelExpire;
         this.needLookup = needLookup;
         this.forceRewriteAllFiles = forceRewriteAllFiles;
+        this.keepDelete = keepDelete;
 
         MetricUtils.safeCall(this::reportMetrics, LOG);
     }
@@ -174,7 +178,8 @@ public class MergeTreeCompactManager extends 
CompactFutureManager {
                      * See CompactStrategy.pick.
                      */
                     boolean dropDelete =
-                            unit.outputLevel() != 0
+                            !keepDelete
+                                    && unit.outputLevel() != 0
                                     && (unit.outputLevel() >= 
levels.nonEmptyHighestLevel()
                                             || dvMaintainer != null);
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index 177f093583..326158c7b7 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -308,7 +308,8 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
                     options.prepareCommitWaitCompaction(),
                     options.needLookup(),
                     recordLevelExpire,
-                    options.forceRewriteAllFiles());
+                    options.forceRewriteAllFiles(),
+                    options.isChainTable());
         }
     }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
index 11d3ede213..9769e59e17 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
@@ -455,6 +455,7 @@ public abstract class MergeTreeTestBase {
                 false,
                 options.needLookup(),
                 null,
+                false,
                 false);
     }
 
@@ -480,6 +481,7 @@ public abstract class MergeTreeTestBase {
                     false,
                     false,
                     null,
+                    false,
                     false);
         }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerTest.java
index 4adff94778..07406baac5 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerTest.java
@@ -18,24 +18,54 @@
 
 package org.apache.paimon.mergetree.compact;
 
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.KeyValue;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.TestFileStore;
+import org.apache.paimon.TestKeyValueGenerator;
 import org.apache.paimon.compact.CompactResult;
 import org.apache.paimon.compact.CompactUnit;
 import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.deletionvectors.DeletionVector;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.DataFileTestUtils;
+import org.apache.paimon.io.FileReaderFactory;
+import org.apache.paimon.io.KeyValueFileReaderFactory;
+import org.apache.paimon.io.KeyValueFileWriterFactory;
+import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.mergetree.Levels;
+import org.apache.paimon.mergetree.MergeSorter;
 import org.apache.paimon.mergetree.SortedRun;
+import org.apache.paimon.operation.FileStoreScan;
+import org.apache.paimon.operation.SplitRead;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.reader.RecordReaderIterator;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.SchemaUtils;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.types.RowKind;
 
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
 
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.ExecutionException;
@@ -43,6 +73,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.stream.Collectors;
 
+import static org.apache.paimon.format.FileFormat.fileFormat;
 import static org.apache.paimon.io.DataFileTestUtils.newFile;
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -53,6 +84,8 @@ public class MergeTreeCompactManagerTest {
 
     private static ExecutorService service;
 
+    @TempDir java.nio.file.Path tempDir;
+
     @BeforeAll
     public static void before() {
         service = Executors.newSingleThreadExecutor();
@@ -209,6 +242,7 @@ public class MergeTreeCompactManagerTest {
                         false,
                         true,
                         null,
+                        false,
                         false);
 
         MergeTreeCompactManager defaultManager =
@@ -225,12 +259,242 @@ public class MergeTreeCompactManagerTest {
                         false,
                         false,
                         null,
+                        false,
                         false);
 
         assertThat(lookupManager.compactNotCompleted()).isTrue();
         assertThat(defaultManager.compactNotCompleted()).isFalse();
     }
 
+    @Test
+    public void testCompactWithKeepDelete() throws Exception {
+        // 1) Build a minimal TestFileStore with primary key and one bucket.
+        TableSchema tableSchema =
+                SchemaUtils.forceCommit(
+                        new SchemaManager(new LocalFileIO(), new 
Path(tempDir.toUri())),
+                        new Schema(
+                                
TestKeyValueGenerator.DEFAULT_ROW_TYPE.getFields(),
+                                
TestKeyValueGenerator.DEFAULT_PART_TYPE.getFieldNames(),
+                                TestKeyValueGenerator.getPrimaryKeys(
+                                        
TestKeyValueGenerator.GeneratorMode.MULTI_PARTITIONED),
+                                Collections.emptyMap(),
+                                null));
+        TestFileStore store =
+                new TestFileStore.Builder(
+                                "avro",
+                                tempDir.toString(),
+                                1,
+                                TestKeyValueGenerator.DEFAULT_PART_TYPE,
+                                TestKeyValueGenerator.KEY_TYPE,
+                                TestKeyValueGenerator.DEFAULT_ROW_TYPE,
+                                
TestKeyValueGenerator.TestKeyValueFieldsExtractor.EXTRACTOR,
+                                DeduplicateMergeFunction.factory(),
+                                tableSchema)
+                        .build();
+
+        // 2) Write INSERT + DELETE for the same key into level-0 files.
+        int shopId = 1;
+        long orderId = 42L;
+        String dt = "20240101";
+        int hr = 0;
+
+        GenericRow keyRow = GenericRow.of(shopId, orderId);
+        GenericRow fullRow =
+                GenericRow.of(
+                        BinaryString.fromString(dt),
+                        hr,
+                        shopId,
+                        orderId,
+                        100L,
+                        null,
+                        BinaryString.fromString("comment"));
+
+        BinaryRow keyBinaryRow = 
TestKeyValueGenerator.KEY_SERIALIZER.toBinaryRow(keyRow).copy();
+        BinaryRow valueBinaryRow =
+                
TestKeyValueGenerator.DEFAULT_ROW_SERIALIZER.toBinaryRow(fullRow).copy();
+
+        InternalRowSerializer partitionSerializer =
+                new 
InternalRowSerializer(TestKeyValueGenerator.DEFAULT_PART_TYPE);
+        BinaryRow partition =
+                partitionSerializer
+                        
.toBinaryRow(GenericRow.of(BinaryString.fromString(dt), hr))
+                        .copy();
+
+        KeyValue insert = new KeyValue().replace(keyBinaryRow, 0L, 
RowKind.INSERT, valueBinaryRow);
+        KeyValue delete = new KeyValue().replace(keyBinaryRow, 1L, 
RowKind.DELETE, valueBinaryRow);
+
+        List<KeyValue> kvs = Arrays.asList(insert, delete);
+
+        List<Snapshot> snapshots =
+                store.commitData(kvs, kv -> partition, kv -> 0 /* single 
bucket */);
+        Snapshot snapshot = snapshots.get(snapshots.size() - 1);
+        long snapshotId = snapshot.id();
+
+        // Collect input files from manifest entries of this snapshot.
+        FileStoreScan scan = store.newScan();
+        List<ManifestEntry> manifestEntries = 
scan.withSnapshot(snapshotId).plan().files();
+
+        List<DataFileMeta> inputFiles =
+                manifestEntries.stream()
+                        .filter(e -> e.kind() == FileKind.ADD)
+                        .map(ManifestEntry::file)
+                        .collect(Collectors.toList());
+
+        assertThat(inputFiles).isNotEmpty();
+
+        // 3) Create a MergeTreeCompactManager with keepDelete=true and run 
compaction.
+        Comparator<InternalRow> keyComparator =
+                Comparator.<InternalRow>comparingInt(r -> r.getInt(0))
+                        .thenComparingLong(r -> r.getLong(1));
+
+        CoreOptions coreOptions = store.options();
+        Levels levels = new Levels(keyComparator, inputFiles, 
coreOptions.numLevels());
+
+        // Always compact all runs into the highest level.
+        CompactStrategy strategy =
+                (numLevels, runs) -> 
Optional.of(CompactUnit.fromLevelRuns(numLevels - 1, runs));
+
+        KeyValueFileReaderFactory.Builder readerFactoryBuilder = 
store.newReaderFactoryBuilder();
+        KeyValueFileReaderFactory keyReaderFactory =
+                readerFactoryBuilder.build(partition, 0, 
DeletionVector.emptyFactory());
+        FileReaderFactory<KeyValue> readerFactory = keyReaderFactory;
+
+        KeyValueFileWriterFactory.Builder writerFactoryBuilder =
+                KeyValueFileWriterFactory.builder(
+                        store.fileIO(),
+                        snapshot.schemaId(),
+                        TestKeyValueGenerator.KEY_TYPE,
+                        TestKeyValueGenerator.DEFAULT_ROW_TYPE,
+                        fileFormat(coreOptions),
+                        ignore -> store.pathFactory(),
+                        coreOptions.targetFileSize(true));
+        KeyValueFileWriterFactory writerFactory =
+                writerFactoryBuilder.build(partition, 0, coreOptions);
+
+        MergeSorter mergeSorter =
+                new MergeSorter(
+                        coreOptions,
+                        TestKeyValueGenerator.KEY_TYPE,
+                        TestKeyValueGenerator.DEFAULT_ROW_TYPE,
+                        null);
+
+        MergeTreeCompactRewriter rewriter =
+                new MergeTreeCompactRewriter(
+                        readerFactory,
+                        writerFactory,
+                        keyComparator,
+                        null,
+                        DeduplicateMergeFunction.factory(),
+                        mergeSorter);
+
+        ExecutorService executor = Executors.newSingleThreadExecutor();
+        MergeTreeCompactManager manager =
+                new MergeTreeCompactManager(
+                        executor,
+                        levels,
+                        strategy,
+                        keyComparator,
+                        coreOptions.compactionFileSize(true),
+                        coreOptions.numSortedRunStopTrigger(),
+                        rewriter,
+                        null,
+                        null,
+                        false,
+                        false,
+                        null,
+                        false,
+                        true); // keepDelete=true
+
+        try {
+            manager.triggerCompaction(false);
+            Optional<CompactResult> resultOptional = 
manager.getCompactionResult(true);
+            assertThat(resultOptional).isPresent();
+            CompactResult compactResult = resultOptional.get();
+
+            List<DataFileMeta> compactedFiles = compactResult.after();
+            assertThat(compactedFiles).isNotEmpty();
+
+            int highestLevelBefore =
+                    
inputFiles.stream().mapToInt(DataFileMeta::level).max().orElse(0);
+            for (DataFileMeta file : compactedFiles) {
+                assertThat(file.level()).isNotZero();
+                
assertThat(file.level()).isGreaterThanOrEqualTo(highestLevelBefore);
+            }
+
+            // 4) Read back from compacted files (via manifest entries) 
without DropDeleteReader
+            // and assert a DELETE record exists.
+            int totalBuckets = coreOptions.bucket();
+            int bucket = 0;
+            List<ManifestEntry> compactedEntries =
+                    compactedFiles.stream()
+                            .map(
+                                    file ->
+                                            ManifestEntry.create(
+                                                    FileKind.ADD,
+                                                    partition,
+                                                    bucket,
+                                                    totalBuckets,
+                                                    file))
+                            .collect(Collectors.toList());
+
+            SplitRead<KeyValue> read = store.newRead().forceKeepDelete();
+
+            Map<BinaryRow, Map<Integer, List<DataFileMeta>>> 
filesPerPartitionAndBucket =
+                    new HashMap<>();
+            for (ManifestEntry entry : compactedEntries) {
+                filesPerPartitionAndBucket
+                        .computeIfAbsent(entry.partition(), p -> new 
HashMap<>())
+                        .computeIfAbsent(entry.bucket(), b -> new 
ArrayList<>())
+                        .add(entry.file());
+            }
+
+            List<KeyValue> readBack = new ArrayList<>();
+            for (Map.Entry<BinaryRow, Map<Integer, List<DataFileMeta>>> entry :
+                    filesPerPartitionAndBucket.entrySet()) {
+                BinaryRow part = entry.getKey();
+                for (Map.Entry<Integer, List<DataFileMeta>> bucketEntry :
+                        entry.getValue().entrySet()) {
+                    RecordReader<KeyValue> reader =
+                            read.createReader(
+                                    DataSplit.builder()
+                                            .withPartition(part)
+                                            .withBucket(bucketEntry.getKey())
+                                            
.withDataFiles(bucketEntry.getValue())
+                                            .isStreaming(false)
+                                            .rawConvertible(false)
+                                            .withBucketPath("not used")
+                                            .build());
+                    RecordReaderIterator<KeyValue> iterator = new 
RecordReaderIterator<>(reader);
+                    try {
+                        while (iterator.hasNext()) {
+                            readBack.add(
+                                    iterator.next()
+                                            .copy(
+                                                    
TestKeyValueGenerator.KEY_SERIALIZER,
+                                                    
TestKeyValueGenerator.DEFAULT_ROW_SERIALIZER));
+                        }
+                    } finally {
+                        iterator.close();
+                    }
+                }
+            }
+
+            List<KeyValue> recordsForKey =
+                    readBack.stream()
+                            .filter(
+                                    kv ->
+                                            kv.key().getInt(0) == shopId
+                                                    && kv.key().getLong(1) == 
orderId)
+                            .collect(Collectors.toList());
+
+            assertThat(recordsForKey).isNotEmpty();
+            assertThat(recordsForKey).allMatch(kv -> kv.valueKind() == 
RowKind.DELETE);
+        } finally {
+            manager.close();
+            executor.shutdownNow();
+        }
+    }
+
     private void innerTest(List<LevelMinMax> inputs, List<LevelMinMax> 
expected)
             throws ExecutionException, InterruptedException {
         innerTest(inputs, expected, testStrategy(), true);
@@ -262,6 +526,7 @@ public class MergeTreeCompactManagerTest {
                         false,
                         false,
                         null,
+                        false,
                         false);
         manager.triggerCompaction(false);
         manager.getCompactionResult(true);

Reply via email to