This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new a74615423 [spark][core] spark compact with deletion vector (#3971)
a74615423 is described below

commit a7461542392130bbb00aefe1aeb74bcf2edf992c
Author: Yann Byron <[email protected]>
AuthorDate: Tue Aug 20 18:51:31 2024 +0800

    [spark][core] spark compact with deletion vector (#3971)
---
 .../org/apache/paimon/AppendOnlyFileStore.java     |   8 ++
 .../org/apache/paimon/append/AppendOnlyWriter.java |  26 +++-
 .../append/BucketedAppendCompactManager.java       | 137 ++++++++++++++++-----
 .../paimon/append/UnawareAppendCompactionTask.java |  55 ++++++++-
 .../UnawareAppendTableCompactionCoordinator.java   |  74 +++++++++--
 .../org/apache/paimon/compact/CompactResult.java   |  20 +++
 .../DeletionVectorIndexFileWriter.java             |   4 +
 .../deletionvectors/DeletionVectorsIndexFile.java  |   2 +-
 .../append/AppendDeletionFileMaintainer.java       |  17 ++-
 .../BucketedAppendDeletionFileMaintainer.java      |  26 +++-
 .../UnawareAppendDeletionFileMaintainer.java       |  54 ++++++--
 .../paimon/manifest/IndexManifestFileHandler.java  |  21 +++-
 .../apache/paimon/mergetree/MergeTreeWriter.java   |   2 +-
 .../paimon/operation/AbstractFileStoreWrite.java   |  10 +-
 .../paimon/operation/AppendOnlyFileStoreWrite.java |  83 +++++++++----
 .../paimon/operation/KeyValueFileStoreWrite.java   |   1 +
 .../paimon/table/AppendOnlyFileStoreTable.java     |   4 +-
 .../org/apache/paimon/utils/CommitIncrement.java   |  15 +++
 .../append/AppendOnlyTableCompactionITTest.java    |   2 +-
 .../apache/paimon/append/AppendOnlyWriterTest.java |   1 +
 .../append/BucketedAppendCompactManagerTest.java   |   1 +
 .../apache/paimon/append/FullCompactTaskTest.java  |   2 +-
 .../append/AppendDeletionFileMaintainerTest.java   |   6 +-
 .../apache/paimon/format/FileFormatSuffixTest.java |   2 +-
 .../flink/compact/UnawareBucketCompactor.java      |   2 +-
 .../paimon/spark/procedure/CompactProcedure.java   |   2 +-
 .../paimon/spark/commands/PaimonSparkWriter.scala  |   2 +-
 .../paimon/spark/sql/DeletionVectorTest.scala      |  90 ++++++++++----
 28 files changed, 531 insertions(+), 138 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
index 3cd7bb3b6..1e50da660 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
@@ -19,6 +19,7 @@
 package org.apache.paimon;
 
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
 import org.apache.paimon.format.FileFormatDiscover;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.manifest.ManifestCacheFilter;
@@ -93,6 +94,11 @@ public class AppendOnlyFileStore extends 
AbstractFileStore<InternalRow> {
     @Override
     public AppendOnlyFileStoreWrite newWrite(
             String commitUser, ManifestCacheFilter manifestFilter) {
+        DeletionVectorsMaintainer.Factory deletionVectorsMaintainerFactory = 
null;
+        if (options.deletionVectorsEnabled()) {
+            deletionVectorsMaintainerFactory =
+                    new 
DeletionVectorsMaintainer.Factory(newIndexFileHandler());
+        }
         return new AppendOnlyFileStoreWrite(
                 fileIO,
                 newRead(),
@@ -103,6 +109,8 @@ public class AppendOnlyFileStore extends 
AbstractFileStore<InternalRow> {
                 snapshotManager(),
                 newScan(true).withManifestCacheFilter(manifestFilter),
                 options,
+                bucketMode(),
+                deletionVectorsMaintainerFactory,
                 tableName);
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
index 136cfd246..aeb55d497 100644
--- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
@@ -27,10 +27,12 @@ import org.apache.paimon.disk.RowBuffer;
 import org.apache.paimon.fileindex.FileIndexOptions;
 import org.apache.paimon.format.FileFormat;
 import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.index.IndexFileMeta;
 import org.apache.paimon.io.CompactIncrement;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.DataFilePathFactory;
 import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.io.IndexIncrement;
 import org.apache.paimon.io.RowDataRollingFileWriter;
 import org.apache.paimon.manifest.FileSource;
 import org.apache.paimon.memory.MemoryOwner;
@@ -75,6 +77,8 @@ public class AppendOnlyWriter implements 
RecordWriter<InternalRow>, MemoryOwner
     private final List<DataFileMeta> deletedFiles;
     private final List<DataFileMeta> compactBefore;
     private final List<DataFileMeta> compactAfter;
+    private final List<IndexFileMeta> indexFilesBefore;
+    private final List<IndexFileMeta> indexFilesAfter;
     private final LongCounter seqNumCounter;
     private final String fileCompression;
     private final String spillCompression;
@@ -121,6 +125,8 @@ public class AppendOnlyWriter implements 
RecordWriter<InternalRow>, MemoryOwner
         this.deletedFiles = new ArrayList<>();
         this.compactBefore = new ArrayList<>();
         this.compactAfter = new ArrayList<>();
+        this.indexFilesBefore = new ArrayList<>();
+        this.indexFilesAfter = new ArrayList<>();
         this.seqNumCounter = new LongCounter(maxSequenceNumber + 1);
         this.fileCompression = fileCompression;
         this.spillCompression = spillCompression;
@@ -139,6 +145,10 @@ public class AppendOnlyWriter implements 
RecordWriter<InternalRow>, MemoryOwner
             deletedFiles.addAll(increment.newFilesIncrement().deletedFiles());
             compactBefore.addAll(increment.compactIncrement().compactBefore());
             compactAfter.addAll(increment.compactIncrement().compactAfter());
+            if (increment.indexIncrement() != null) {
+                
indexFilesBefore.addAll(increment.indexIncrement().deletedIndexFiles());
+                
indexFilesAfter.addAll(increment.indexIncrement().newIndexFiles());
+            }
         }
     }
 
@@ -276,6 +286,11 @@ public class AppendOnlyWriter implements 
RecordWriter<InternalRow>, MemoryOwner
                         result -> {
                             compactBefore.addAll(result.before());
                             compactAfter.addAll(result.after());
+                            if (result.indexIncrement() != null) {
+                                indexFilesBefore.addAll(
+                                        
result.indexIncrement().deletedIndexFiles());
+                                
indexFilesAfter.addAll(result.indexIncrement().newIndexFiles());
+                            }
                         });
     }
 
@@ -291,12 +306,21 @@ public class AppendOnlyWriter implements 
RecordWriter<InternalRow>, MemoryOwner
                         new ArrayList<>(compactAfter),
                         Collections.emptyList());
 
+        IndexIncrement indexIncrement = null;
+        if (!indexFilesBefore.isEmpty() || !indexFilesAfter.isEmpty()) {
+            indexIncrement =
+                    new IndexIncrement(
+                            new ArrayList<>(indexFilesAfter), new 
ArrayList<>(indexFilesBefore));
+        }
+
         newFiles.clear();
         deletedFiles.clear();
         compactBefore.clear();
         compactAfter.clear();
+        indexFilesBefore.clear();
+        indexFilesAfter.clear();
 
-        return new CommitIncrement(dataIncrement, compactIncrement, null);
+        return new CommitIncrement(dataIncrement, compactIncrement, 
indexIncrement, null);
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/BucketedAppendCompactManager.java
 
b/paimon-core/src/main/java/org/apache/paimon/append/BucketedAppendCompactManager.java
index d44793613..ea48e608e 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/append/BucketedAppendCompactManager.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/append/BucketedAppendCompactManager.java
@@ -23,9 +23,15 @@ import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.compact.CompactFutureManager;
 import org.apache.paimon.compact.CompactResult;
 import org.apache.paimon.compact.CompactTask;
+import org.apache.paimon.deletionvectors.append.AppendDeletionFileMaintainer;
+import org.apache.paimon.index.IndexFileMeta;
 import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.IndexIncrement;
+import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.manifest.IndexManifestEntry;
 import org.apache.paimon.operation.metrics.CompactionMetrics;
 import org.apache.paimon.operation.metrics.MetricUtils;
+import org.apache.paimon.table.source.DeletionFile;
 import org.apache.paimon.utils.Preconditions;
 
 import org.slf4j.Logger;
@@ -36,6 +42,7 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.LinkedList;
 import java.util.List;
@@ -52,6 +59,7 @@ public class BucketedAppendCompactManager extends 
CompactFutureManager {
     private static final int FULL_COMPACT_MIN_FILE = 3;
 
     private final ExecutorService executor;
+    private final AppendDeletionFileMaintainer dvIndexFileMaintainer;
     private final TreeSet<DataFileMeta> toCompact;
     private final int minFileNum;
     private final int maxFileNum;
@@ -65,12 +73,14 @@ public class BucketedAppendCompactManager extends 
CompactFutureManager {
     public BucketedAppendCompactManager(
             ExecutorService executor,
             List<DataFileMeta> restored,
+            @Nullable AppendDeletionFileMaintainer dvIndexFileMaintainer,
             int minFileNum,
             int maxFileNum,
             long targetFileSize,
             CompactRewriter rewriter,
             @Nullable CompactionMetrics.Reporter metricsReporter) {
         this.executor = executor;
+        this.dvIndexFileMaintainer = dvIndexFileMaintainer;
         this.toCompact = new TreeSet<>(fileComparator(false));
         this.toCompact.addAll(restored);
         this.minFileNum = minFileNum;
@@ -94,13 +104,20 @@ public class BucketedAppendCompactManager extends 
CompactFutureManager {
                 taskFuture == null,
                 "A compaction task is still running while the user "
                         + "forces a new compaction. This is unexpected.");
-        if (toCompact.size() < FULL_COMPACT_MIN_FILE) {
+        // if deletion vector enables, always trigger compaction.
+        if (toCompact.isEmpty()
+                || (dvIndexFileMaintainer == null && toCompact.size() < 
FULL_COMPACT_MIN_FILE)) {
             return;
         }
 
         taskFuture =
                 executor.submit(
-                        new FullCompactTask(toCompact, targetFileSize, 
rewriter, metricsReporter));
+                        new FullCompactTask(
+                                dvIndexFileMaintainer,
+                                toCompact,
+                                targetFileSize,
+                                rewriter,
+                                metricsReporter));
         compacting = new ArrayList<>(toCompact);
         toCompact.clear();
     }
@@ -113,7 +130,9 @@ public class BucketedAppendCompactManager extends 
CompactFutureManager {
         if (picked.isPresent()) {
             compacting = picked.get();
             taskFuture =
-                    executor.submit(new AutoCompactTask(compacting, rewriter, 
metricsReporter));
+                    executor.submit(
+                            new AutoCompactTask(
+                                    dvIndexFileMaintainer, compacting, 
rewriter, metricsReporter));
         }
     }
 
@@ -207,17 +226,20 @@ public class BucketedAppendCompactManager extends 
CompactFutureManager {
     /** A {@link CompactTask} impl for full compaction of append-only table. */
     public static class FullCompactTask extends CompactTask {
 
-        private final LinkedList<DataFileMeta> inputs;
+        private final AppendDeletionFileMaintainer dvIndexFileMaintainer;
+        private final LinkedList<DataFileMeta> toCompact;
         private final long targetFileSize;
         private final CompactRewriter rewriter;
 
         public FullCompactTask(
+                AppendDeletionFileMaintainer dvIndexFileMaintainer,
                 Collection<DataFileMeta> inputs,
                 long targetFileSize,
                 CompactRewriter rewriter,
                 @Nullable CompactionMetrics.Reporter metricsReporter) {
             super(metricsReporter);
-            this.inputs = new LinkedList<>(inputs);
+            this.dvIndexFileMaintainer = dvIndexFileMaintainer;
+            this.toCompact = new LinkedList<>(inputs);
             this.targetFileSize = targetFileSize;
             this.rewriter = rewriter;
         }
@@ -225,34 +247,42 @@ public class BucketedAppendCompactManager extends 
CompactFutureManager {
         @Override
         protected CompactResult doCompact() throws Exception {
             // remove large files
-            while (!inputs.isEmpty()) {
-                DataFileMeta file = inputs.peekFirst();
-                if (file.fileSize() >= targetFileSize) {
-                    inputs.poll();
+            while (!toCompact.isEmpty()) {
+                DataFileMeta file = toCompact.peekFirst();
+                // the data file with deletion file always need to be 
compacted.
+                if (file.fileSize() >= targetFileSize && 
!hasDeletionFile(file)) {
+                    toCompact.poll();
                     continue;
                 }
                 break;
             }
 
-            // compute small files
-            int big = 0;
-            int small = 0;
-            for (DataFileMeta file : inputs) {
-                if (file.fileSize() >= targetFileSize) {
-                    big++;
+            // do compaction
+            if (dvIndexFileMaintainer != null) {
+                // if deletion vector enables, always trigger compaction.
+                return compact(dvIndexFileMaintainer, toCompact, rewriter);
+            } else {
+                // compute small files
+                int big = 0;
+                int small = 0;
+                for (DataFileMeta file : toCompact) {
+                    if (file.fileSize() >= targetFileSize) {
+                        big++;
+                    } else {
+                        small++;
+                    }
+                }
+                if (small > big && toCompact.size() >= FULL_COMPACT_MIN_FILE) {
+                    return compact(dvIndexFileMaintainer, toCompact, rewriter);
                 } else {
-                    small++;
+                    return result(Collections.emptyList(), 
Collections.emptyList());
                 }
             }
+        }
 
-            // do compaction
-            List<DataFileMeta> compactBefore = new ArrayList<>();
-            List<DataFileMeta> compactAfter = new ArrayList<>();
-            if (small > big && inputs.size() >= FULL_COMPACT_MIN_FILE) {
-                compactBefore = new ArrayList<>(inputs);
-                compactAfter = rewriter.rewrite(inputs);
-            }
-            return result(new ArrayList<>(compactBefore), compactAfter);
+        private boolean hasDeletionFile(DataFileMeta file) {
+            return dvIndexFileMaintainer != null
+                    && dvIndexFileMaintainer.getDeletionFile(file.fileName()) 
== null;
         }
     }
 
@@ -265,36 +295,75 @@ public class BucketedAppendCompactManager extends 
CompactFutureManager {
      */
     public static class AutoCompactTask extends CompactTask {
 
+        private final AppendDeletionFileMaintainer dvIndexFileMaintainer;
         private final List<DataFileMeta> toCompact;
         private final CompactRewriter rewriter;
 
         public AutoCompactTask(
+                AppendDeletionFileMaintainer dvIndexFileMaintainer,
                 List<DataFileMeta> toCompact,
                 CompactRewriter rewriter,
                 @Nullable CompactionMetrics.Reporter metricsReporter) {
             super(metricsReporter);
+            this.dvIndexFileMaintainer = dvIndexFileMaintainer;
             this.toCompact = toCompact;
             this.rewriter = rewriter;
         }
 
         @Override
         protected CompactResult doCompact() throws Exception {
+            return compact(dvIndexFileMaintainer, toCompact, rewriter);
+        }
+    }
+
+    private static CompactResult compact(
+            AppendDeletionFileMaintainer dvIndexFileMaintainer,
+            List<DataFileMeta> toCompact,
+            CompactRewriter rewriter)
+            throws Exception {
+        if (dvIndexFileMaintainer == null) {
             return result(toCompact, rewriter.rewrite(toCompact));
+        } else {
+            List<DeletionFile> deletionFiles = new ArrayList<>();
+            for (DataFileMeta dataFile : toCompact) {
+                
deletionFiles.add(dvIndexFileMaintainer.getDeletionFile(dataFile.fileName()));
+            }
+            List<DataFileMeta> compactAfter = rewriter.rewrite(toCompact);
+            toCompact.forEach(f -> 
dvIndexFileMaintainer.notifyRemovedDeletionVector(f.fileName()));
+
+            List<IndexManifestEntry> indexManifestEntries = 
dvIndexFileMaintainer.persist();
+            if (indexManifestEntries.isEmpty()) {
+                return result(toCompact, compactAfter);
+            } else {
+                List<IndexFileMeta> indexFilesBefore = new ArrayList<>();
+                List<IndexFileMeta> indexFilesAfter = new ArrayList<>();
+                for (IndexManifestEntry entry : indexManifestEntries) {
+                    if (entry.kind() == FileKind.ADD) {
+                        indexFilesAfter.add(entry.indexFile());
+                    } else {
+                        indexFilesBefore.add(entry.indexFile());
+                    }
+                }
+                return result(toCompact, indexFilesBefore, compactAfter, 
indexFilesAfter);
+            }
         }
     }
 
     private static CompactResult result(List<DataFileMeta> before, 
List<DataFileMeta> after) {
-        return new CompactResult() {
-            @Override
-            public List<DataFileMeta> before() {
-                return before;
-            }
+        return new CompactResult(before, after);
+    }
 
-            @Override
-            public List<DataFileMeta> after() {
-                return after;
-            }
-        };
+    private static CompactResult result(
+            List<DataFileMeta> before,
+            @Nullable List<IndexFileMeta> indexFilesBefore,
+            List<DataFileMeta> after,
+            @Nullable List<IndexFileMeta> indexFilesAfter) {
+        CompactResult result = new CompactResult(before, after);
+        if (indexFilesBefore != null || indexFilesAfter != null) {
+            IndexIncrement indexIncrement = new 
IndexIncrement(indexFilesAfter, indexFilesBefore);
+            result.setIndexIncrement(indexIncrement);
+        }
+        return result;
     }
 
     /** Compact rewriter for append-only table. */
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/UnawareAppendCompactionTask.java
 
b/paimon-core/src/main/java/org/apache/paimon/append/UnawareAppendCompactionTask.java
index 3d81cdbf6..5b949cb25 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/append/UnawareAppendCompactionTask.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/append/UnawareAppendCompactionTask.java
@@ -19,10 +19,17 @@
 package org.apache.paimon.append;
 
 import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.deletionvectors.append.AppendDeletionFileMaintainer;
+import 
org.apache.paimon.deletionvectors.append.UnawareAppendDeletionFileMaintainer;
+import org.apache.paimon.index.IndexFileMeta;
 import org.apache.paimon.io.CompactIncrement;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.io.IndexIncrement;
+import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.manifest.IndexManifestEntry;
 import org.apache.paimon.operation.AppendOnlyFileStoreWrite;
+import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.sink.CommitMessage;
 import org.apache.paimon.table.sink.CommitMessageImpl;
 import org.apache.paimon.utils.Preconditions;
@@ -31,6 +38,9 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.table.BucketMode.UNAWARE_BUCKET;
 
 /** Compaction task generated by {@link 
UnawareAppendTableCompactionCoordinator}. */
 public class UnawareAppendCompactionTask {
@@ -40,9 +50,7 @@ public class UnawareAppendCompactionTask {
     private final List<DataFileMeta> compactAfter;
 
     public UnawareAppendCompactionTask(BinaryRow partition, List<DataFileMeta> 
files) {
-        Preconditions.checkArgument(
-                files != null && files.size() > 1,
-                "AppendOnlyCompactionTask need more than one file input.");
+        Preconditions.checkArgument(files != null);
         this.partition = partition;
         compactBefore = new ArrayList<>(files);
         compactAfter = new ArrayList<>();
@@ -60,8 +68,42 @@ public class UnawareAppendCompactionTask {
         return compactAfter;
     }
 
-    public CommitMessage doCompact(AppendOnlyFileStoreWrite write) throws 
Exception {
-        compactAfter.addAll(write.compactRewrite(partition, 0, compactBefore));
+    public CommitMessage doCompact(FileStoreTable table, 
AppendOnlyFileStoreWrite write)
+            throws Exception {
+        boolean dvEnabled = table.coreOptions().deletionVectorsEnabled();
+        Preconditions.checkArgument(
+                dvEnabled || compactBefore.size() > 1,
+                "AppendOnlyCompactionTask need more than one file input.");
+        IndexIncrement indexIncrement;
+        if (dvEnabled) {
+            UnawareAppendDeletionFileMaintainer dvIndexFileMaintainer =
+                    (UnawareAppendDeletionFileMaintainer)
+                            AppendDeletionFileMaintainer.forUnawareAppend(
+                                    table.store().newIndexFileHandler(),
+                                    table.snapshotManager().latestSnapshotId(),
+                                    partition);
+            compactAfter.addAll(
+                    write.compactRewrite(
+                            partition, UNAWARE_BUCKET, dvIndexFileMaintainer, 
compactBefore));
+
+            compactBefore.forEach(
+                    f -> {
+                        
dvIndexFileMaintainer.notifyRemovedDeletionVector(f.fileName());
+                    });
+            List<IndexManifestEntry> indexEntries = 
dvIndexFileMaintainer.persist();
+            Preconditions.checkArgument(
+                    indexEntries.stream().noneMatch(i -> i.kind() == 
FileKind.ADD));
+            List<IndexFileMeta> removed =
+                    indexEntries.stream()
+                            .map(IndexManifestEntry::indexFile)
+                            .collect(Collectors.toList());
+            indexIncrement = new IndexIncrement(Collections.emptyList(), 
removed);
+        } else {
+            compactAfter.addAll(
+                    write.compactRewrite(partition, UNAWARE_BUCKET, null, 
compactBefore));
+            indexIncrement = new IndexIncrement(Collections.emptyList());
+        }
+
         CompactIncrement compactIncrement =
                 new CompactIncrement(compactBefore, compactAfter, 
Collections.emptyList());
         return new CommitMessageImpl(
@@ -69,7 +111,8 @@ public class UnawareAppendCompactionTask {
                 0, // bucket 0 is bucket for unaware-bucket table for 
compatibility with the old
                 // design
                 DataIncrement.emptyIncrement(),
-                compactIncrement);
+                compactIncrement,
+                indexIncrement);
     }
 
     public int hashCode() {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/UnawareAppendTableCompactionCoordinator.java
 
b/paimon-core/src/main/java/org/apache/paimon/append/UnawareAppendTableCompactionCoordinator.java
index 732300bd3..48239f6f3 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/append/UnawareAppendTableCompactionCoordinator.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/append/UnawareAppendTableCompactionCoordinator.java
@@ -21,6 +21,10 @@ package org.apache.paimon.append;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.deletionvectors.append.AppendDeletionFileMaintainer;
+import 
org.apache.paimon.deletionvectors.append.UnawareAppendDeletionFileMaintainer;
+import org.apache.paimon.index.IndexFileHandler;
+import org.apache.paimon.index.IndexFileMeta;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.table.FileStoreTable;
@@ -38,6 +42,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 /**
@@ -60,12 +65,15 @@ public class UnawareAppendTableCompactionCoordinator {
     protected static final int REMOVE_AGE = 10;
     protected static final int COMPACT_AGE = 5;
 
+    @Nullable private final Long snapshotId;
     private final InnerTableScan scan;
     private final long targetFileSize;
     private final long compactionFileSize;
     private final int minFileNum;
     private final int maxFileNum;
     private final boolean streamingMode;
+    private final IndexFileHandler indexFileHandler;
+    private final boolean deletionVectorEnabled;
 
     final Map<BinaryRow, PartitionCompactCoordinator> 
partitionCompactCoordinators =
             new HashMap<>();
@@ -90,6 +98,7 @@ public class UnawareAppendTableCompactionCoordinator {
         if (filter != null) {
             scan.withFilter(filter);
         }
+        this.snapshotId = table.snapshotManager().latestSnapshotId();
         this.streamingMode = isStreaming;
         CoreOptions coreOptions = table.coreOptions();
         this.targetFileSize = coreOptions.targetFileSize(false);
@@ -97,6 +106,8 @@ public class UnawareAppendTableCompactionCoordinator {
         this.minFileNum = coreOptions.compactionMinFileNum();
         // this is global compaction, avoid too many compaction tasks
         this.maxFileNum = coreOptions.compactionMaxFileNum().orElse(50);
+        this.indexFileHandler = table.store().newIndexFileHandler();
+        this.deletionVectorEnabled = coreOptions.deletionVectorsEnabled();
     }
 
     public List<UnawareAppendCompactionTask> run() {
@@ -130,12 +141,30 @@ public class UnawareAppendTableCompactionCoordinator {
 
     @VisibleForTesting
     void notifyNewFiles(BinaryRow partition, List<DataFileMeta> files) {
+        UnawareAppendDeletionFileMaintainer dvIndexFileMaintainer;
+        if (deletionVectorEnabled) {
+            dvIndexFileMaintainer =
+                    (UnawareAppendDeletionFileMaintainer)
+                            AppendDeletionFileMaintainer.forUnawareAppend(
+                                    indexFileHandler, snapshotId, partition);
+        } else {
+            dvIndexFileMaintainer = null;
+        }
+        java.util.function.Predicate<DataFileMeta> filter =
+                file -> {
+                    if (dvIndexFileMaintainer == null
+                            || 
dvIndexFileMaintainer.getDeletionFile(file.fileName()) == null) {
+                        return file.fileSize() < compactionFileSize;
+                    }
+                    // if a data file has a deletion file, always be to 
compact.
+                    return true;
+                };
+        List<DataFileMeta> toCompact = 
files.stream().filter(filter).collect(Collectors.toList());
         partitionCompactCoordinators
-                .computeIfAbsent(partition, PartitionCompactCoordinator::new)
-                .addFiles(
-                        files.stream()
-                                .filter(file -> file.fileSize() < 
compactionFileSize)
-                                .collect(Collectors.toList()));
+                .computeIfAbsent(
+                        partition,
+                        pp -> new 
PartitionCompactCoordinator(dvIndexFileMaintainer, partition))
+                .addFiles(toCompact);
     }
 
     @VisibleForTesting
@@ -181,11 +210,14 @@ public class UnawareAppendTableCompactionCoordinator {
     /** Coordinator for a single partition. */
     class PartitionCompactCoordinator {
 
+        private final UnawareAppendDeletionFileMaintainer 
dvIndexFileMaintainer;
         private final BinaryRow partition;
         private final HashSet<DataFileMeta> toCompact = new HashSet<>();
         int age = 0;
 
-        public PartitionCompactCoordinator(BinaryRow partition) {
+        public PartitionCompactCoordinator(
+                UnawareAppendDeletionFileMaintainer dvIndexFileMaintainer, 
BinaryRow partition) {
+            this.dvIndexFileMaintainer = dvIndexFileMaintainer;
             this.partition = partition;
         }
 
@@ -216,7 +248,12 @@ public class UnawareAppendTableCompactionCoordinator {
         }
 
         private List<List<DataFileMeta>> agePack() {
-            List<List<DataFileMeta>> packed = pack();
+            List<List<DataFileMeta>> packed;
+            if (dvIndexFileMaintainer == null) {
+                packed = pack(toCompact);
+            } else {
+                packed = packInDeletionVectorVMode(toCompact);
+            }
             if (packed.isEmpty()) {
                 // non-packed, we need to grow up age, and check whether to 
compact once
                 if (++age > COMPACT_AGE && toCompact.size() > 1) {
@@ -230,7 +267,7 @@ public class UnawareAppendTableCompactionCoordinator {
             return packed;
         }
 
-        private List<List<DataFileMeta>> pack() {
+        private List<List<DataFileMeta>> pack(Set<DataFileMeta> toCompact) {
             // we compact smaller files first
             // step 1, sort files by file size, pick the smaller first
             ArrayList<DataFileMeta> files = new ArrayList<>(toCompact);
@@ -252,6 +289,27 @@ public class UnawareAppendTableCompactionCoordinator {
             return result;
         }
 
+        private List<List<DataFileMeta>> 
packInDeletionVectorVMode(Set<DataFileMeta> toCompact) {
+            // we group the data files by their related index files.
+            Map<IndexFileMeta, List<DataFileMeta>> filesWithDV = new 
HashMap<>();
+            Set<DataFileMeta> rest = new HashSet<>();
+            for (DataFileMeta dataFile : toCompact) {
+                IndexFileMeta indexFile = 
dvIndexFileMaintainer.getIndexFile(dataFile.fileName());
+                if (indexFile == null) {
+                    rest.add(dataFile);
+                } else {
+                    filesWithDV.computeIfAbsent(indexFile, f -> new 
ArrayList<>()).add(dataFile);
+                }
+            }
+
+            List<List<DataFileMeta>> result = new ArrayList<>();
+            result.addAll(filesWithDV.values());
+            if (rest.size() > 1) {
+                result.addAll(pack(rest));
+            }
+            return result;
+        }
+
         /**
          * A file bin for {@link PartitionCompactCoordinator} determine 
whether ready to compact.
          */
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/compact/CompactResult.java 
b/paimon-core/src/main/java/org/apache/paimon/compact/CompactResult.java
index 08d7de5da..4de5d7315 100644
--- a/paimon-core/src/main/java/org/apache/paimon/compact/CompactResult.java
+++ b/paimon-core/src/main/java/org/apache/paimon/compact/CompactResult.java
@@ -19,6 +19,8 @@
 package org.apache.paimon.compact;
 
 import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.IndexIncrement;
+import org.apache.paimon.utils.Preconditions;
 
 import javax.annotation.Nullable;
 
@@ -32,6 +34,9 @@ public class CompactResult {
     private final List<DataFileMeta> before;
     private final List<DataFileMeta> after;
     private final List<DataFileMeta> changelog;
+    // TODO: unify IndexIncrement and CompactDeletionFile for both primary-key 
table and append-only
+    // table.
+    @Nullable private IndexIncrement indexIncrement;
 
     @Nullable private CompactDeletionFile deletionFile;
 
@@ -66,10 +71,25 @@ public class CompactResult {
         return changelog;
     }
 
+    public void setIndexIncrement(@Nullable IndexIncrement indexIncrement) {
+        Preconditions.checkArgument(
+                deletionFile == null,
+                "indexIncrement and deletionFile can't be set at the same 
time");
+        this.indexIncrement = indexIncrement;
+    }
+
     public void setDeletionFile(@Nullable CompactDeletionFile deletionFile) {
+        Preconditions.checkArgument(
+                indexIncrement == null,
+                "indexIncrement and deletionFile can't be set at the same 
time");
         this.deletionFile = deletionFile;
     }
 
+    @Nullable
+    public IndexIncrement indexIncrement() {
+        return indexIncrement;
+    }
+
     @Nullable
     public CompactDeletionFile deletionFile() {
         return deletionFile;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileWriter.java
 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileWriter.java
index a1d85b5ba..f8c8330f1 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileWriter.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileWriter.java
@@ -54,6 +54,10 @@ public class DeletionVectorIndexFileWriter {
         this.targetSizeInBytes = targetSizePerIndexFile.getBytes();
     }
 
+    /**
+     * For unaware-bucket mode, this method will write out multiple index 
files, else, it will write
+     * out only one index file.
+     */
     public List<IndexFileMeta> write(Map<String, DeletionVector> input) throws 
IOException {
         if (input.isEmpty()) {
             return emptyIndexFile();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java
 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java
index ffb024734..798404e00 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java
@@ -121,7 +121,7 @@ public class DeletionVectorsIndexFile extends IndexFile {
         return deletionVectors;
     }
 
-    public DeletionVector readDeletionVector(String dataFile, DeletionFile 
deletionFile) {
+    public DeletionVector readDeletionVector(DeletionFile deletionFile) {
         String indexFile = deletionFile.path();
         try (SeekableInputStream inputStream = fileIO.newInputStream(new 
Path(indexFile))) {
             checkVersion(inputStream);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainer.java
 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainer.java
index 4922e45a6..c89eb4b54 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainer.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainer.java
@@ -36,7 +36,8 @@ import static 
org.apache.paimon.table.BucketMode.UNAWARE_BUCKET;
  * A maintainer to maintain deletion files for append table, the core methods:
  *
  * <ul>
- *   <li>{@link #notifyDeletionFiles}: Mark the deletion of data files, create 
new deletion vectors.
+ *   <li>{@link #notifyNewDeletionVector}: Mark the deletion of data files, 
create new deletion
+ *       vectors.
  *   <li>{@link #persist}: persist deletion files to commit.
  * </ul>
  */
@@ -46,7 +47,14 @@ public interface AppendDeletionFileMaintainer {
 
     int getBucket();
 
-    void notifyDeletionFiles(String dataFile, DeletionVector deletionVector);
+    DeletionFile getDeletionFile(String dataFile);
+
+    DeletionVector getDeletionVector(String dataFile);
+
+    void notifyNewDeletionVector(String dataFile, DeletionVector 
deletionVector);
+
+    /** In compaction operation, notify that a deletion file of a data file is 
dropped. */
+    void notifyRemovedDeletionVector(String dataFile);
 
     List<IndexManifestEntry> persist();
 
@@ -60,7 +68,10 @@ public interface AppendDeletionFileMaintainer {
         DeletionVectorsMaintainer maintainer =
                 new DeletionVectorsMaintainer.Factory(indexFileHandler)
                         .createOrRestore(snapshotId, partition, bucket);
-        return new BucketedAppendDeletionFileMaintainer(partition, bucket, 
maintainer);
+        Map<String, DeletionFile> deletionFiles =
+                indexFileHandler.scanDVIndex(snapshotId, partition, bucket);
+        return new BucketedAppendDeletionFileMaintainer(
+                partition, bucket, deletionFiles, maintainer);
     }
 
     static AppendDeletionFileMaintainer forUnawareAppend(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/BucketedAppendDeletionFileMaintainer.java
 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/BucketedAppendDeletionFileMaintainer.java
index 1b839575f..f8fcb218a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/BucketedAppendDeletionFileMaintainer.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/BucketedAppendDeletionFileMaintainer.java
@@ -23,8 +23,10 @@ import org.apache.paimon.deletionvectors.DeletionVector;
 import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
 import org.apache.paimon.manifest.FileKind;
 import org.apache.paimon.manifest.IndexManifestEntry;
+import org.apache.paimon.table.source.DeletionFile;
 
 import java.util.List;
+import java.util.Map;
 import java.util.stream.Collectors;
 
 /** A {@link AppendDeletionFileMaintainer} of bucketed append table. */
@@ -32,12 +34,17 @@ public class BucketedAppendDeletionFileMaintainer 
implements AppendDeletionFileM
 
     private final BinaryRow partition;
     private final int bucket;
+    private final Map<String, DeletionFile> dataFileToDeletionFile;
     private final DeletionVectorsMaintainer maintainer;
 
     BucketedAppendDeletionFileMaintainer(
-            BinaryRow partition, int bucket, DeletionVectorsMaintainer 
maintainer) {
+            BinaryRow partition,
+            int bucket,
+            Map<String, DeletionFile> deletionFiles,
+            DeletionVectorsMaintainer maintainer) {
         this.partition = partition;
         this.bucket = bucket;
+        this.dataFileToDeletionFile = deletionFiles;
         this.maintainer = maintainer;
     }
 
@@ -52,10 +59,25 @@ public class BucketedAppendDeletionFileMaintainer 
implements AppendDeletionFileM
     }
 
     @Override
-    public void notifyDeletionFiles(String dataFile, DeletionVector 
deletionVector) {
+    public DeletionFile getDeletionFile(String dataFile) {
+        return dataFileToDeletionFile.get(dataFile);
+    }
+
+    @Override
+    public DeletionVector getDeletionVector(String dataFile) {
+        return this.maintainer.deletionVectorOf(dataFile).orElse(null);
+    }
+
+    @Override
+    public void notifyNewDeletionVector(String dataFile, DeletionVector 
deletionVector) {
         maintainer.mergeNewDeletion(dataFile, deletionVector);
     }
 
+    @Override
+    public void notifyRemovedDeletionVector(String dataFile) {
+        maintainer.removeDeletionVectorOf(dataFile);
+    }
+
     @Override
     public List<IndexManifestEntry> persist() {
         return maintainer.writeDeletionVectorsIndex().stream()
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/UnawareAppendDeletionFileMaintainer.java
 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/UnawareAppendDeletionFileMaintainer.java
index de6baac9f..0fe0074d7 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/UnawareAppendDeletionFileMaintainer.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/UnawareAppendDeletionFileMaintainer.java
@@ -46,6 +46,7 @@ public class UnawareAppendDeletionFileMaintainer implements 
AppendDeletionFileMa
     private final IndexFileHandler indexFileHandler;
 
     private final BinaryRow partition;
+    private final Map<String, DeletionFile> dataFileToDeletionFile;
     private final Map<String, IndexManifestEntry> indexNameToEntry = new 
HashMap<>();
 
     private final Map<String, Map<String, DeletionFile>> 
indexFileToDeletionFiles = new HashMap<>();
@@ -61,6 +62,7 @@ public class UnawareAppendDeletionFileMaintainer implements 
AppendDeletionFileMa
             Map<String, DeletionFile> deletionFiles) {
         this.indexFileHandler = indexFileHandler;
         this.partition = partition;
+        this.dataFileToDeletionFile = deletionFiles;
         // the deletion of data files is independent
         // just create an empty maintainer
         this.maintainer = new 
DeletionVectorsMaintainer.Factory(indexFileHandler).create();
@@ -102,19 +104,29 @@ public class UnawareAppendDeletionFileMaintainer 
implements AppendDeletionFileMa
         return UNAWARE_BUCKET;
     }
 
+    public DeletionFile getDeletionFile(String dataFile) {
+        return this.dataFileToDeletionFile.get(dataFile);
+    }
+
     @Override
-    public void notifyDeletionFiles(String dataFile, DeletionVector 
deletionVector) {
-        DeletionVectorsIndexFile deletionVectorsIndexFile = 
indexFileHandler.deletionVectorsIndex();
-        DeletionFile previous = null;
-        if (dataFileToIndexFile.containsKey(dataFile)) {
-            String indexFileName = dataFileToIndexFile.get(dataFile);
-            touchedIndexFiles.add(indexFileName);
-            if (indexFileToDeletionFiles.containsKey(indexFileName)) {
-                previous = 
indexFileToDeletionFiles.get(indexFileName).remove(dataFile);
-            }
+    public DeletionVector getDeletionVector(String dataFile) {
+        DeletionFile deletionFile = getDeletionFile(dataFile);
+        if (deletionFile != null) {
+            return 
indexFileHandler.deletionVectorsIndex().readDeletionVector(deletionFile);
         }
+        return null;
+    }
+
+    public void notifyRemovedDeletionVector(String dataFile) {
+        getRemovedDeletionFile(dataFile);
+    }
+
+    @Override
+    public void notifyNewDeletionVector(String dataFile, DeletionVector 
deletionVector) {
+        DeletionVectorsIndexFile deletionVectorsIndexFile = 
indexFileHandler.deletionVectorsIndex();
+        DeletionFile previous = getRemovedDeletionFile(dataFile);
         if (previous != null) {
-            
deletionVector.merge(deletionVectorsIndexFile.readDeletionVector(dataFile, 
previous));
+            
deletionVector.merge(deletionVectorsIndexFile.readDeletionVector(previous));
         }
         maintainer.notifyNewDeletion(dataFile, deletionVector);
     }
@@ -133,6 +145,28 @@ public class UnawareAppendDeletionFileMaintainer 
implements AppendDeletionFileMa
         return result;
     }
 
+    private DeletionFile getRemovedDeletionFile(String dataFile) {
+        if (dataFileToIndexFile.containsKey(dataFile)) {
+            String indexFileName = dataFileToIndexFile.get(dataFile);
+            touchedIndexFiles.add(indexFileName);
+            if (indexFileToDeletionFiles.containsKey(indexFileName)) {
+                return 
indexFileToDeletionFiles.get(indexFileName).remove(dataFile);
+            }
+        }
+        return null;
+    }
+
+    public IndexFileMeta getIndexFile(String dataFile) {
+        DeletionFile deletionFile = getDeletionFile(dataFile);
+        if (deletionFile == null) {
+            return null;
+        } else {
+            IndexManifestEntry entry =
+                    this.indexNameToEntry.get(new 
Path(deletionFile.path()).getName());
+            return entry == null ? null : entry.indexFile();
+        }
+    }
+
     @VisibleForTesting
     List<IndexManifestEntry> writeUnchangedDeletionVector() {
         DeletionVectorsIndexFile deletionVectorsIndexFile = 
indexFileHandler.deletionVectorsIndex();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFileHandler.java
 
b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFileHandler.java
index 89fbb6080..b5719555f 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFileHandler.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFileHandler.java
@@ -30,6 +30,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.stream.Collectors;
 
 import static 
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
 import static org.apache.paimon.index.HashIndexFile.HASH_INDEX;
@@ -140,12 +141,20 @@ public class IndexManifestFileHandler {
                 indexEntries.put(identifier(entry), entry);
             }
 
-            for (IndexManifestEntry entry : newIndexFiles) {
-                if (entry.kind() == FileKind.ADD) {
-                    indexEntries.put(identifier(entry), entry);
-                } else {
-                    indexEntries.remove(identifier(entry));
-                }
+            // The deleted entry is processed first to avoid overwriting a new 
entry.
+            List<IndexManifestEntry> removed =
+                    newIndexFiles.stream()
+                            .filter(f -> f.kind() == FileKind.DELETE)
+                            .collect(Collectors.toList());
+            List<IndexManifestEntry> added =
+                    newIndexFiles.stream()
+                            .filter(f -> f.kind() == FileKind.ADD)
+                            .collect(Collectors.toList());
+            for (IndexManifestEntry entry : removed) {
+                indexEntries.remove(identifier(entry));
+            }
+            for (IndexManifestEntry entry : added) {
+                indexEntries.put(identifier(entry), entry);
             }
             return new ArrayList<>(indexEntries.values());
         }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
index aa60c8846..66ba6743f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
@@ -317,7 +317,7 @@ public class MergeTreeWriter implements 
RecordWriter<KeyValue>, MemoryOwner {
         compactChangelog.clear();
         this.compactDeletionFile = null;
 
-        return new CommitIncrement(dataIncrement, compactIncrement, 
drainDeletionFile);
+        return new CommitIncrement(dataIncrement, compactIncrement, null, 
drainDeletionFile);
     }
 
     private void trySyncLatestCompaction(boolean blocking) throws Exception {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
index 459fec027..5e68e10cb 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
@@ -208,6 +208,7 @@ public abstract class AbstractFileStoreWrite<T> implements 
FileStoreWrite<T> {
                 WriterContainer<T> writerContainer = entry.getValue();
 
                 CommitIncrement increment = 
writerContainer.writer.prepareCommit(waitCompaction);
+                List<IndexFileMeta> deletedIndexFiles = new ArrayList<>();
                 List<IndexFileMeta> newIndexFiles = new ArrayList<>();
                 if (writerContainer.indexMaintainer != null) {
                     
newIndexFiles.addAll(writerContainer.indexMaintainer.prepareCommit());
@@ -216,13 +217,17 @@ public abstract class AbstractFileStoreWrite<T> 
implements FileStoreWrite<T> {
                 if (compactDeletionFile != null) {
                     
compactDeletionFile.getOrCompute().ifPresent(newIndexFiles::add);
                 }
+                if (increment.indexIncrement() != null) {
+                    
newIndexFiles.addAll(increment.indexIncrement().newIndexFiles());
+                    
deletedIndexFiles.addAll(increment.indexIncrement().deletedIndexFiles());
+                }
                 CommitMessageImpl committable =
                         new CommitMessageImpl(
                                 partition,
                                 bucket,
                                 increment.newFilesIncrement(),
                                 increment.compactIncrement(),
-                                new IndexIncrement(newIndexFiles));
+                                new IndexIncrement(newIndexFiles, 
deletedIndexFiles));
                 result.add(committable);
 
                 if (committable.isEmpty()) {
@@ -333,6 +338,7 @@ public abstract class AbstractFileStoreWrite<T> implements 
FileStoreWrite<T> {
         for (State<T> state : states) {
             RecordWriter<T> writer =
                     createWriter(
+                            state.baseSnapshotId,
                             state.partition,
                             state.bucket,
                             state.dataFiles,
@@ -408,6 +414,7 @@ public abstract class AbstractFileStoreWrite<T> implements 
FileStoreWrite<T> {
                                 ignorePreviousFiles ? null : latestSnapshotId, 
partition, bucket);
         RecordWriter<T> writer =
                 createWriter(
+                        latestSnapshotId,
                         partition.copy(),
                         bucket,
                         restoreFiles,
@@ -460,6 +467,7 @@ public abstract class AbstractFileStoreWrite<T> implements 
FileStoreWrite<T> {
     protected void notifyNewWriter(RecordWriter<T> writer) {}
 
     protected abstract RecordWriter<T> createWriter(
+            @Nullable Long snapshotId,
             BinaryRow partition,
             int bucket,
             List<DataFileMeta> restoreFiles,
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
index f4df43107..0dcf2c1bf 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
@@ -28,10 +28,12 @@ import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.deletionvectors.DeletionVector;
 import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
+import org.apache.paimon.deletionvectors.append.AppendDeletionFileMaintainer;
 import org.apache.paimon.fileindex.FileIndexOptions;
 import org.apache.paimon.format.FileFormat;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataFilePathFactory;
 import org.apache.paimon.io.RowDataRollingFileWriter;
 import org.apache.paimon.manifest.FileSource;
 import org.apache.paimon.options.MemorySize;
@@ -58,6 +60,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
 
 /** {@link FileStoreWrite} for {@link AppendOnlyFileStore}. */
 public class AppendOnlyFileStoreWrite extends 
MemoryFileStoreWrite<InternalRow> {
@@ -81,10 +84,9 @@ public class AppendOnlyFileStoreWrite extends 
MemoryFileStoreWrite<InternalRow>
     private final MemorySize maxDiskSize;
     private final SimpleColStatsCollector.Factory[] statsCollectors;
     private final FileIndexOptions fileIndexOptions;
-
+    private final BucketMode bucketMode;
     private boolean forceBufferSpill = false;
     private boolean skipCompaction;
-    private BucketMode bucketMode = BucketMode.HASH_FIXED;
 
     public AppendOnlyFileStoreWrite(
             FileIO fileIO,
@@ -96,19 +98,36 @@ public class AppendOnlyFileStoreWrite extends 
MemoryFileStoreWrite<InternalRow>
             SnapshotManager snapshotManager,
             FileStoreScan scan,
             CoreOptions options,
+            BucketMode bucketMode,
+            @Nullable DeletionVectorsMaintainer.Factory 
deletionVectorsMaintainerFactory,
             String tableName) {
-        super(commitUser, snapshotManager, scan, options, null, null, 
tableName);
+        super(
+                commitUser,
+                snapshotManager,
+                scan,
+                options,
+                null,
+                deletionVectorsMaintainerFactory,
+                tableName);
         this.fileIO = fileIO;
         this.read = read;
         this.schemaId = schemaId;
         this.rowType = rowType;
         this.fileFormat = options.fileFormat();
         this.pathFactory = pathFactory;
+        this.bucketMode = bucketMode;
         this.targetFileSize = options.targetFileSize(false);
         this.compactionMinFileNum = options.compactionMinFileNum();
         this.compactionMaxFileNum = options.compactionMaxFileNum().orElse(5);
         this.commitForceCompact = options.commitForceCompact();
-        this.skipCompaction = options.writeOnly();
+        // AppendOnlyFileStoreWrite is sensitive with bucket mode. It will act 
difference in
+        // unaware-bucket mode (no compaction and force empty-writer).
+        if (bucketMode == BucketMode.BUCKET_UNAWARE) {
+            super.withIgnorePreviousFiles(true);
+            skipCompaction = true;
+        } else {
+            this.skipCompaction = options.writeOnly();
+        }
         this.fileCompression = options.fileCompression();
         this.spillCompression = options.spillCompression();
         this.useWriteBuffer = options.useWriteBufferForAppend();
@@ -121,23 +140,41 @@ public class AppendOnlyFileStoreWrite extends 
MemoryFileStoreWrite<InternalRow>
 
     @Override
     protected RecordWriter<InternalRow> createWriter(
+            @Nullable Long snapshotId,
             BinaryRow partition,
             int bucket,
             List<DataFileMeta> restoredFiles,
             long restoredMaxSeqNumber,
             @Nullable CommitIncrement restoreIncrement,
             ExecutorService compactExecutor,
-            @Nullable DeletionVectorsMaintainer ignore) {
+            @Nullable DeletionVectorsMaintainer dvMaintainer) {
+        AppendDeletionFileMaintainer dvIndexFileMaintainer;
+        if (!skipCompaction && dvMaintainer != null) {
+            dvIndexFileMaintainer =
+                    AppendDeletionFileMaintainer.forBucketedAppend(
+                            dvMaintainer.indexFileHandler(), snapshotId, 
partition, bucket);
+        } else {
+            dvIndexFileMaintainer = null;
+        }
+        // let writer and compact manager hold the same reference
+        // and make restore files mutable to update
+        DataFilePathFactory factory = 
pathFactory.createDataFilePathFactory(partition, bucket);
         CompactManager compactManager =
                 skipCompaction
                         ? new NoopCompactManager()
                         : new BucketedAppendCompactManager(
                                 compactExecutor,
                                 restoredFiles,
+                                dvIndexFileMaintainer,
                                 compactionMinFileNum,
                                 compactionMaxFileNum,
                                 targetFileSize,
-                                toCompact -> compactRewrite(partition, bucket, 
toCompact),
+                                toCompact ->
+                                        compactRewrite(
+                                                partition,
+                                                bucket,
+                                                dvIndexFileMaintainer,
+                                                toCompact),
                                 compactionMetrics == null
                                         ? null
                                         : 
compactionMetrics.createReporter(partition, bucket));
@@ -166,17 +203,11 @@ public class AppendOnlyFileStoreWrite extends 
MemoryFileStoreWrite<InternalRow>
                 options.asyncFileWrite());
     }
 
-    /** TODO remove this, and pass deletion vectors. */
-    public List<DataFileMeta> compactRewrite(
-            BinaryRow partition, int bucket, List<DataFileMeta> toCompact) 
throws Exception {
-        return compactRewrite(partition, bucket, toCompact, null);
-    }
-
     public List<DataFileMeta> compactRewrite(
             BinaryRow partition,
             int bucket,
-            List<DataFileMeta> toCompact,
-            @Nullable List<IOExceptionSupplier<DeletionVector>> dvFactories)
+            @Nullable AppendDeletionFileMaintainer dvIndexFileMaintainer,
+            List<DataFileMeta> toCompact)
             throws Exception {
         if (toCompact.isEmpty()) {
             return Collections.emptyList();
@@ -188,6 +219,18 @@ public class AppendOnlyFileStoreWrite extends 
MemoryFileStoreWrite<InternalRow>
                         bucket,
                         new LongCounter(toCompact.get(0).minSequenceNumber()),
                         FileSource.COMPACT);
+        List<IOExceptionSupplier<DeletionVector>> dvFactories =
+                dvIndexFileMaintainer == null
+                        ? null
+                        : toCompact.stream()
+                                .map(
+                                        f ->
+                                                
(IOExceptionSupplier<DeletionVector>)
+                                                        () ->
+                                                                
dvIndexFileMaintainer
+                                                                        
.getDeletionVector(
+                                                                               
 f.fileName()))
+                                .collect(Collectors.toList());
         try {
             rewriter.write(createFilesIterator(partition, bucket, toCompact, 
dvFactories));
         } catch (Exception e) {
@@ -199,7 +242,6 @@ public class AppendOnlyFileStoreWrite extends 
MemoryFileStoreWrite<InternalRow>
                 collectedExceptions = ExceptionUtils.firstOrSuppressed(e, 
collectedExceptions);
             }
         }
-
         if (collectedExceptions != null) {
             throw collectedExceptions;
         }
@@ -232,17 +274,6 @@ public class AppendOnlyFileStoreWrite extends 
MemoryFileStoreWrite<InternalRow>
         return new RecordReaderIterator<>(read.createReader(partition, bucket, 
files, dvFactories));
     }
 
-    public AppendOnlyFileStoreWrite withBucketMode(BucketMode bucketMode) {
-        // AppendOnlyFileStoreWrite is sensitive with bucket mode. It will act 
difference in
-        // unaware-bucket mode (no compaction and force empty-writer).
-        this.bucketMode = bucketMode;
-        if (bucketMode == BucketMode.BUCKET_UNAWARE) {
-            super.withIgnorePreviousFiles(true);
-            skipCompaction = true;
-        }
-        return this;
-    }
-
     @Override
     public void withIgnorePreviousFiles(boolean ignorePrevious) {
         // in unaware bucket mode, we need all writers to be empty
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index 5f21d3c1a..9f4570cf2 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -172,6 +172,7 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
 
     @Override
     protected MergeTreeWriter createWriter(
+            @Nullable Long snapshotId,
             BinaryRow partition,
             int bucket,
             List<DataFileMeta> restoreFiles,
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
index 4b6dd9b08..c9a32a251 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
@@ -138,9 +138,7 @@ class AppendOnlyFileStoreTable extends 
AbstractFileStoreTable {
     @Override
     public TableWriteImpl<InternalRow> newWrite(
             String commitUser, ManifestCacheFilter manifestFilter) {
-        // if this table is unaware-bucket table, we skip compaction and 
restored files searching
-        AppendOnlyFileStoreWrite writer =
-                store().newWrite(commitUser, 
manifestFilter).withBucketMode(bucketMode());
+        AppendOnlyFileStoreWrite writer = store().newWrite(commitUser, 
manifestFilter);
         return new TableWriteImpl<>(
                 rowType(),
                 writer,
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/CommitIncrement.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/CommitIncrement.java
index 3c16378f8..18c7d5675 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/CommitIncrement.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/CommitIncrement.java
@@ -21,6 +21,7 @@ package org.apache.paimon.utils;
 import org.apache.paimon.compact.CompactDeletionFile;
 import org.apache.paimon.io.CompactIncrement;
 import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.io.IndexIncrement;
 
 import javax.annotation.Nullable;
 
@@ -29,14 +30,23 @@ public class CommitIncrement {
 
     private final DataIncrement dataIncrement;
     private final CompactIncrement compactIncrement;
+
+    // TODO: unify IndexIncrement and CompactDeletionFile for both primary-key 
table and append-only
+    // table.
+    @Nullable private final IndexIncrement indexIncrement;
     @Nullable private final CompactDeletionFile compactDeletionFile;
 
     public CommitIncrement(
             DataIncrement dataIncrement,
             CompactIncrement compactIncrement,
+            @Nullable IndexIncrement indexIncrement,
             @Nullable CompactDeletionFile compactDeletionFile) {
+        Preconditions.checkArgument(
+                indexIncrement == null || compactDeletionFile == null,
+                "indexIncrement and compactDeletionFile can't be set at the 
same time");
         this.dataIncrement = dataIncrement;
         this.compactIncrement = compactIncrement;
+        this.indexIncrement = indexIncrement;
         this.compactDeletionFile = compactDeletionFile;
     }
 
@@ -48,6 +58,11 @@ public class CommitIncrement {
         return compactIncrement;
     }
 
+    @Nullable
+    public IndexIncrement indexIncrement() {
+        return indexIncrement;
+    }
+
     @Nullable
     public CompactDeletionFile compactDeletionFile() {
         return compactDeletionFile;
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionITTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionITTest.java
index d23528e3f..82ce2e45a 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionITTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionITTest.java
@@ -166,7 +166,7 @@ public class AppendOnlyTableCompactionITTest {
             throws Exception {
         List<CommitMessage> result = new ArrayList<>();
         for (UnawareAppendCompactionTask task : tasks) {
-            result.add(task.doCompact(write));
+            result.add(task.doCompact(appendOnlyFileStoreTable, write));
         }
         return result;
     }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java 
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
index d5c98d667..85385280e 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
@@ -595,6 +595,7 @@ public class AppendOnlyWriterTest {
                         Executors.newSingleThreadScheduledExecutor(
                                 new 
ExecutorThreadFactory("compaction-thread")),
                         toCompact,
+                        null,
                         MIN_FILE_NUM,
                         MAX_FILE_NUM,
                         targetFileSize,
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/BucketedAppendCompactManagerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/append/BucketedAppendCompactManagerTest.java
index fd374cc17..b9f4f7f4a 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/append/BucketedAppendCompactManagerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/append/BucketedAppendCompactManagerTest.java
@@ -203,6 +203,7 @@ public class BucketedAppendCompactManagerTest {
                 new BucketedAppendCompactManager(
                         null, // not used
                         toCompactBeforePick,
+                        null,
                         minFileNum,
                         maxFileNum,
                         targetFileSize,
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/FullCompactTaskTest.java 
b/paimon-core/src/test/java/org/apache/paimon/append/FullCompactTaskTest.java
index b2af4138c..e7c3cce01 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/append/FullCompactTaskTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/append/FullCompactTaskTest.java
@@ -123,7 +123,7 @@ public class FullCompactTaskTest {
                 Collection<DataFileMeta> inputs,
                 long targetFileSize,
                 BucketedAppendCompactManager.CompactRewriter rewriter) {
-            super(inputs, targetFileSize, rewriter, null);
+            super(null, inputs, targetFileSize, rewriter, null);
         }
     }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerTest.java
index 2ebc30cf9..6c674352b 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerTest.java
@@ -74,13 +74,13 @@ class AppendDeletionFileMaintainerTest {
                 store.createDVIFMaintainer(BinaryRow.EMPTY_ROW, 
dataFileToDeletionFiles);
 
         // no dv should be rewritten, because nothing is changed.
-        List<IndexManifestEntry> res = 
dvIFMaintainer.writeUnchangedDeletionVector();
+        List<IndexManifestEntry> res = dvIFMaintainer.persist();
         assertThat(res.size()).isEqualTo(0);
 
         // the dv of f3 is updated, and the index file that contains the dv of 
f3 should be marked
         // as REMOVE.
         FileIO fileIO = LocalFileIO.create();
-        dvIFMaintainer.notifyDeletionFiles(
+        dvIFMaintainer.notifyNewDeletionVector(
                 "f3", DeletionVector.read(fileIO, 
dataFileToDeletionFiles.get("f3")));
         res = dvIFMaintainer.writeUnchangedDeletionVector();
         assertThat(res.size()).isEqualTo(1);
@@ -88,7 +88,7 @@ class AppendDeletionFileMaintainerTest {
 
         // the dv of f1 and f2 are in one index file, and the dv of f1 is 
updated.
         // the dv of f2 need to be rewritten, and this index file should be 
marked as REMOVE.
-        dvIFMaintainer.notifyDeletionFiles(
+        dvIFMaintainer.notifyNewDeletionVector(
                 "f1", DeletionVector.read(fileIO, 
dataFileToDeletionFiles.get("f1")));
         res = dvIFMaintainer.writeUnchangedDeletionVector();
         assertThat(res.size()).isEqualTo(3);
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java 
b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
index cb02b4343..13ac9736e 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
@@ -79,7 +79,7 @@ public class FileFormatSuffixTest extends 
KeyValueFileReadWriteTest {
                         SCHEMA,
                         0,
                         new BucketedAppendCompactManager(
-                                null, toCompact, 4, 10, 10, null, null), // 
not used
+                                null, toCompact, null, 4, 10, 10, null, null), 
// not used
                         null,
                         false,
                         dataFilePathFactory,
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactor.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactor.java
index ab37ff3b2..8c2b56777 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactor.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactor.java
@@ -60,7 +60,7 @@ public class UnawareBucketCompactor {
     }
 
     public void processElement(UnawareAppendCompactionTask task) throws 
Exception {
-        result.add(compactExecutorsupplier.get().submit(() -> 
task.doCompact(write)));
+        result.add(compactExecutorsupplier.get().submit(() -> 
task.doCompact(table, write)));
     }
 
     public void close() throws Exception {
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
index 378418b2d..a6b11e7db 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
@@ -396,7 +396,7 @@ public class CompactProcedure extends BaseProcedure {
                                                                     
taskIterator.next());
                                                     messages.add(
                                                             
messageSer.serialize(
-                                                                    
task.doCompact(write)));
+                                                                    
task.doCompact(table, write)));
                                                 }
                                                 return messages.iterator();
                                             } finally {
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
index d0b2e86ea..dddbaf24b 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
@@ -273,7 +273,7 @@ case class PaimonSparkWriter(table: FileStoreTable) {
 
             sdv.dataFileAndDeletionVector.foreach {
               case (dataFileName, dv) =>
-                dvIndexFileMaintainer.notifyDeletionFiles(
+                dvIndexFileMaintainer.notifyNewDeletionVector(
                   dataFileName,
                   DeletionVector.deserializeFromBytes(dv))
             }
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala
index 26d07ce06..719117bcc 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala
@@ -58,7 +58,7 @@ class DeletionVectorTest extends PaimonSparkTestBase {
             new 
DeletionVectorsMaintainer.Factory(table.store().newIndexFileHandler())
 
           spark.sql("INSERT INTO T VALUES (1, 'a'), (2, 'b'), (3, 'c')")
-          val deletionVectors1 = getLatestDeletionVectors(table, 
dvMaintainerFactory)
+          val deletionVectors1 = getAllLatestDeletionVectors(table, 
dvMaintainerFactory)
           Assertions.assertEquals(0, deletionVectors1.size)
 
           val cond1 = "id = 2"
@@ -67,7 +67,7 @@ class DeletionVectorTest extends PaimonSparkTestBase {
           checkAnswer(
             spark.sql(s"SELECT * from T ORDER BY id"),
             Row(1, "a") :: Row(2, "b_2") :: Row(3, "c") :: Nil)
-          val deletionVectors2 = getLatestDeletionVectors(table, 
dvMaintainerFactory)
+          val deletionVectors2 = getAllLatestDeletionVectors(table, 
dvMaintainerFactory)
           Assertions.assertEquals(1, deletionVectors2.size)
           deletionVectors2
             .foreach {
@@ -79,7 +79,7 @@ class DeletionVectorTest extends PaimonSparkTestBase {
           checkAnswer(
             spark.sql(s"SELECT * from T ORDER BY id"),
             Row(1, "a") :: Row(2, "b_2") :: Row(3, "c") :: Row(4, "d") :: 
Row(5, "e") :: Nil)
-          val deletionVectors3 = getLatestDeletionVectors(table, 
dvMaintainerFactory)
+          val deletionVectors3 = getAllLatestDeletionVectors(table, 
dvMaintainerFactory)
           Assertions.assertTrue(deletionVectors2 == deletionVectors3)
 
           val cond2 = "id % 2 = 1"
@@ -94,6 +94,16 @@ class DeletionVectorTest extends PaimonSparkTestBase {
             Row(1, "_all") :: Row(2, "_all") :: Row(3, "_all") :: Row(4, 
"_all") :: Row(
               5,
               "_all") :: Nil)
+
+          spark.sql("CALL sys.compact('T')")
+          val deletionVectors4 = getAllLatestDeletionVectors(table, 
dvMaintainerFactory)
+          // After compaction, deletionVectors should be empty
+          Assertions.assertTrue(deletionVectors4.isEmpty)
+          checkAnswer(
+            spark.sql(s"SELECT * from T ORDER BY id"),
+            Row(1, "_all") :: Row(2, "_all") :: Row(3, "_all") :: Row(4, 
"_all") :: Row(
+              5,
+              "_all") :: Nil)
         }
       }
   }
@@ -120,7 +130,7 @@ class DeletionVectorTest extends PaimonSparkTestBase {
 
           spark.sql(
             "INSERT INTO T VALUES (1, 'a', '2024'), (2, 'b', '2024'), (3, 'c', 
'2025'), (4, 'd', '2025')")
-          val deletionVectors1 = getLatestDeletionVectors(table, 
dvMaintainerFactory)
+          val deletionVectors1 = getAllLatestDeletionVectors(table, 
dvMaintainerFactory)
           Assertions.assertEquals(0, deletionVectors1.size)
 
           val cond1 = "id = 2"
@@ -169,6 +179,17 @@ class DeletionVectorTest extends PaimonSparkTestBase {
               case (filePath, dv) =>
                 rowMetaInfo2(filePath).foreach(index => 
Assertions.assertTrue(dv.isDeleted(index)))
             }
+
+          spark.sql("CALL sys.compact('T')")
+          val deletionVectors5 = getAllLatestDeletionVectors(table, 
dvMaintainerFactory)
+          // After compaction, deletionVectors should be empty
+          Assertions.assertTrue(deletionVectors5.isEmpty)
+          checkAnswer(
+            spark.sql(s"SELECT * from T ORDER BY id"),
+            Row(1, "a", "2024") :: Row(2, "b_2", "2024") :: Row(3, "c_2", 
"2025") :: Row(
+              4,
+              "d_2",
+              "2025") :: Nil)
         }
       }
   }
@@ -194,14 +215,14 @@ class DeletionVectorTest extends PaimonSparkTestBase {
             new 
DeletionVectorsMaintainer.Factory(table.store().newIndexFileHandler())
 
           spark.sql("INSERT INTO T VALUES (1, 'a'), (2, 'b')")
-          val deletionVectors1 = getLatestDeletionVectors(table, 
dvMaintainerFactory)
+          val deletionVectors1 = getAllLatestDeletionVectors(table, 
dvMaintainerFactory)
           Assertions.assertEquals(0, deletionVectors1.size)
 
           val cond1 = "id = 2"
           val rowMetaInfo1 = getFilePathAndRowIndex(cond1)
           spark.sql(s"DELETE FROM T WHERE $cond1")
           checkAnswer(spark.sql(s"SELECT * from T ORDER BY id"), Row(1, "a") 
:: Nil)
-          val deletionVectors2 = getLatestDeletionVectors(table, 
dvMaintainerFactory)
+          val deletionVectors2 = getAllLatestDeletionVectors(table, 
dvMaintainerFactory)
           Assertions.assertEquals(1, deletionVectors2.size)
           deletionVectors2
             .foreach {
@@ -213,12 +234,18 @@ class DeletionVectorTest extends PaimonSparkTestBase {
           checkAnswer(
             spark.sql(s"SELECT * from T ORDER BY id"),
             Row(1, "a") :: Row(2, "bb") :: Row(3, "c") :: Row(4, "d") :: Nil)
-          val deletionVectors3 = getLatestDeletionVectors(table, 
dvMaintainerFactory)
+          val deletionVectors3 = getAllLatestDeletionVectors(table, 
dvMaintainerFactory)
           Assertions.assertTrue(deletionVectors2 == deletionVectors3)
 
           val cond2 = "id % 2 = 1"
           spark.sql(s"DELETE FROM T WHERE $cond2")
           checkAnswer(spark.sql(s"SELECT * from T ORDER BY id"), Row(2, "bb") 
:: Row(4, "d") :: Nil)
+
+          spark.sql("CALL sys.compact('T')")
+          val deletionVectors4 = getAllLatestDeletionVectors(table, 
dvMaintainerFactory)
+          // After compaction, deletionVectors should be empty
+          Assertions.assertTrue(deletionVectors4.isEmpty)
+          checkAnswer(spark.sql(s"SELECT * from T ORDER BY id"), Row(2, "bb") 
:: Row(4, "d") :: Nil)
         }
       }
   }
@@ -243,9 +270,16 @@ class DeletionVectorTest extends PaimonSparkTestBase {
           val dvMaintainerFactory =
             new 
DeletionVectorsMaintainer.Factory(table.store().newIndexFileHandler())
 
+          def getDeletionVectors(ptValues: Seq[String]): Map[String, 
DeletionVector] = {
+            getLatestDeletionVectors(
+              table,
+              dvMaintainerFactory,
+              ptValues.map(BinaryRow.singleColumn))
+          }
+
           spark.sql(
             "INSERT INTO T VALUES (1, 'a', '2024'), (2, 'b', '2024'), (3, 'c', 
'2025'), (4, 'd', '2025')")
-          val deletionVectors1 = getLatestDeletionVectors(table, 
dvMaintainerFactory)
+          val deletionVectors1 = getAllLatestDeletionVectors(table, 
dvMaintainerFactory)
           Assertions.assertEquals(0, deletionVectors1.size)
 
           val cond1 = "id = 2"
@@ -254,11 +288,7 @@ class DeletionVectorTest extends PaimonSparkTestBase {
           checkAnswer(
             spark.sql(s"SELECT * from T ORDER BY id"),
             Row(1, "a", "2024") :: Row(3, "c", "2025") :: Row(4, "d", "2025") 
:: Nil)
-          val deletionVectors2 =
-            getLatestDeletionVectors(
-              table,
-              dvMaintainerFactory,
-              Seq(BinaryRow.singleColumn("2024"), 
BinaryRow.singleColumn("2025")))
+          val deletionVectors2 = getDeletionVectors(Seq("2024", "2025"))
           Assertions.assertEquals(1, deletionVectors2.size)
           deletionVectors2
             .foreach {
@@ -272,22 +302,28 @@ class DeletionVectorTest extends PaimonSparkTestBase {
           checkAnswer(
             spark.sql(s"SELECT * from T ORDER BY id"),
             Row(1, "a", "2024") :: Row(4, "d", "2025") :: Nil)
-          val deletionVectors3 =
-            getLatestDeletionVectors(
-              table,
-              dvMaintainerFactory,
-              Seq(BinaryRow.singleColumn("2024")))
+          val deletionVectors3 = getDeletionVectors(Seq("2024"))
           Assertions.assertTrue(deletionVectors2 == deletionVectors3)
-          val deletionVectors4 =
-            getLatestDeletionVectors(
-              table,
-              dvMaintainerFactory,
-              Seq(BinaryRow.singleColumn("2024"), 
BinaryRow.singleColumn("2025")))
+          val deletionVectors4 = getDeletionVectors(Seq("2024", "2025"))
           deletionVectors4
             .foreach {
               case (filePath, dv) =>
                 rowMetaInfo2(filePath).foreach(index => 
Assertions.assertTrue(dv.isDeleted(index)))
             }
+
+          spark.sql("""CALL sys.compact(table => 'T', partitions => "pt = 
'2024'")""")
+          Assertions.assertTrue(getDeletionVectors(Seq("2024")).isEmpty)
+          Assertions.assertTrue(getDeletionVectors(Seq("2025")).nonEmpty)
+          checkAnswer(
+            spark.sql(s"SELECT * from T ORDER BY id"),
+            Row(1, "a", "2024") :: Row(4, "d", "2025") :: Nil)
+
+          spark.sql("""CALL sys.compact(table => 'T', where => "pt = 
'2025'")""")
+          Assertions.assertTrue(getDeletionVectors(Seq("2025")).isEmpty)
+          Assertions.assertTrue(getDeletionVectors(Seq("2025")).isEmpty)
+          checkAnswer(
+            spark.sql(s"SELECT * from T ORDER BY id"),
+            Row(1, "a", "2024") :: Row(4, "d", "2025") :: Nil)
         }
       }
   }
@@ -317,7 +353,7 @@ class DeletionVectorTest extends PaimonSparkTestBase {
       val dvMaintainerFactory =
         new 
DeletionVectorsMaintainer.Factory(table.store().newIndexFileHandler())
 
-      val deletionVectors1 = getLatestDeletionVectors(table, 
dvMaintainerFactory)
+      val deletionVectors1 = getAllLatestDeletionVectors(table, 
dvMaintainerFactory)
       // 1, 3 deleted, their row positions are 0, 2
       Assertions.assertEquals(1, deletionVectors1.size)
       deletionVectors1
@@ -330,7 +366,7 @@ class DeletionVectorTest extends PaimonSparkTestBase {
       // Compact
       // f3 (1, 2, 3), no deletion
       spark.sql("CALL sys.compact('T')")
-      val deletionVectors2 = getLatestDeletionVectors(table, 
dvMaintainerFactory)
+      val deletionVectors2 = getAllLatestDeletionVectors(table, 
dvMaintainerFactory)
       // After compaction, deletionVectors should be empty
       Assertions.assertTrue(deletionVectors2.isEmpty)
 
@@ -342,7 +378,7 @@ class DeletionVectorTest extends PaimonSparkTestBase {
         spark.sql(s"SELECT * from T ORDER BY id"),
         Row(1, "a_new1") :: Row(2, "b_new2") :: Row(3, "c_new1") :: Nil)
 
-      val deletionVectors3 = getLatestDeletionVectors(table, 
dvMaintainerFactory)
+      val deletionVectors3 = getAllLatestDeletionVectors(table, 
dvMaintainerFactory)
       // 2 deleted, row positions is 1
       Assertions.assertEquals(1, deletionVectors3.size)
       deletionVectors3
@@ -488,7 +524,7 @@ class DeletionVectorTest extends PaimonSparkTestBase {
     new Path(path).getName
   }
 
-  private def getLatestDeletionVectors(
+  private def getAllLatestDeletionVectors(
       table: FileStoreTable,
       dvMaintainerFactory: DeletionVectorsMaintainer.Factory): Map[String, 
DeletionVector] = {
     getLatestDeletionVectors(table, dvMaintainerFactory, 
Seq(BinaryRow.EMPTY_ROW))

Reply via email to