This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.3
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit 5caa6f6fd78b12762ac68fe5f8fed7b4c1470793
Author: Zouxxyy <[email protected]>
AuthorDate: Thu Oct 9 12:58:30 2025 +0800

    [core] Add dv conflict detection during commit (#6303)
---
 .../java/org/apache/paimon/AbstractFileStore.java  |   5 +-
 .../java/org/apache/paimon/manifest/FileEntry.java |   6 +-
 .../apache/paimon/manifest/SimpleFileEntry.java    |  20 ++
 .../paimon/manifest/SimpleFileEntryWithDV.java     | 124 ++++++++
 .../paimon/operation/FileStoreCommitImpl.java      | 154 +++++++---
 .../apache/paimon/utils/ConflictDeletionUtils.java | 149 +++++++++
 .../org/apache/paimon/TestAppendFileStore.java     |  35 +++
 .../deletionvectors/BucketedDvMaintainerTest.java  |  87 ++++--
 .../paimon/operation/FileStoreCommitTest.java      |  26 +-
 .../paimon/utils/ConflictDeletionUtilsTest.java    | 342 +++++++++++++++++++++
 .../commands/DeleteFromPaimonTableCommand.scala    |   3 +-
 .../spark/commands/MergeIntoPaimonTable.scala      |   4 +-
 .../paimon/spark/commands/PaimonSparkWriter.scala  |   7 +-
 .../spark/commands/UpdatePaimonTableCommand.scala  |   3 +-
 .../paimon/spark/sql/MergeIntoTableTestBase.scala  |  71 +++--
 .../paimon/spark/sql/UpdateTableTestBase.scala     |  39 +++
 16 files changed, 955 insertions(+), 120 deletions(-)

diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index f4a77f877d..bbeb0aa150 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -298,7 +298,10 @@ abstract class AbstractFileStore<T> implements 
FileStore<T> {
                 options.commitMinRetryWait(),
                 options.commitMaxRetryWait(),
                 options.commitStrictModeLastSafeSnapshot().orElse(null),
-                options.rowTrackingEnabled());
+                options.rowTrackingEnabled(),
+                !schema.primaryKeys().isEmpty(),
+                options.deletionVectorsEnabled(),
+                newIndexFileHandler());
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
index 52b029563e..b400258b84 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
@@ -22,7 +22,6 @@ import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.Filter;
-import org.apache.paimon.utils.Preconditions;
 
 import javax.annotation.Nullable;
 
@@ -40,6 +39,7 @@ import java.util.stream.Collectors;
 
 import static 
org.apache.paimon.utils.ManifestReadThreadPool.randomlyExecuteSequentialReturn;
 import static 
org.apache.paimon.utils.ManifestReadThreadPool.sequentialBatchedExecute;
+import static org.apache.paimon.utils.Preconditions.checkState;
 
 /** Entry representing a file. */
 public interface FileEntry {
@@ -77,7 +77,7 @@ public interface FileEntry {
         public final int level;
         public final String fileName;
         public final List<String> extraFiles;
-        @Nullable private final byte[] embeddedIndex;
+        @Nullable public final byte[] embeddedIndex;
         @Nullable public final String externalPath;
 
         /* Cache the hash code for the string */
@@ -190,7 +190,7 @@ public interface FileEntry {
             Identifier identifier = entry.identifier();
             switch (entry.kind()) {
                 case ADD:
-                    Preconditions.checkState(
+                    checkState(
                             !map.containsKey(identifier),
                             "Trying to add file %s which is already added.",
                             identifier);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java
index 14ae43c349..e0da3c8d53 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java
@@ -81,6 +81,21 @@ public class SimpleFileEntry implements FileEntry {
                 entry.externalPath());
     }
 
+    public SimpleFileEntry toDelete() {
+        return new SimpleFileEntry(
+                FileKind.DELETE,
+                partition,
+                bucket,
+                totalBuckets,
+                level,
+                fileName,
+                extraFiles,
+                embeddedIndex,
+                minKey,
+                maxKey,
+                externalPath);
+    }
+
     public static List<SimpleFileEntry> from(List<ManifestEntry> entries) {
         return 
entries.stream().map(SimpleFileEntry::from).collect(Collectors.toList());
     }
@@ -115,6 +130,11 @@ public class SimpleFileEntry implements FileEntry {
         return fileName;
     }
 
+    @Nullable
+    public byte[] embeddedIndex() {
+        return embeddedIndex;
+    }
+
     @Nullable
     @Override
     public String externalPath() {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntryWithDV.java
 
b/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntryWithDV.java
new file mode 100644
index 0000000000..75d73f345f
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntryWithDV.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.manifest;
+
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+
+/** A {@link FileEntry} contains {@link SimpleFileEntry} and dv file name. */
+public class SimpleFileEntryWithDV extends SimpleFileEntry {
+
+    @Nullable private final String dvFileName;
+
+    public SimpleFileEntryWithDV(SimpleFileEntry entry, @Nullable String 
dvFileName) {
+        super(
+                entry.kind(),
+                entry.partition(),
+                entry.bucket(),
+                entry.totalBuckets(),
+                entry.level(),
+                entry.fileName(),
+                entry.extraFiles(),
+                entry.embeddedIndex(),
+                entry.minKey(),
+                entry.maxKey(),
+                entry.externalPath());
+        this.dvFileName = dvFileName;
+    }
+
+    public Identifier identifier() {
+        return new IdentifierWithDv(super.identifier(), dvFileName);
+    }
+
+    @Nullable
+    public String dvFileName() {
+        return dvFileName;
+    }
+
+    public SimpleFileEntry toDelete() {
+        return new SimpleFileEntryWithDV(super.toDelete(), dvFileName);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        if (!super.equals(o)) {
+            return false;
+        }
+        SimpleFileEntryWithDV that = (SimpleFileEntryWithDV) o;
+        return Objects.equals(dvFileName, that.dvFileName);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(super.hashCode(), dvFileName);
+    }
+
+    @Override
+    public String toString() {
+        return super.toString() + ", {dvFileName=" + dvFileName + '}';
+    }
+
+    /**
+     * The same {@link Identifier} indicates that the {@link ManifestEntry} 
refers to the same data
+     * file.
+     */
+    static class IdentifierWithDv extends Identifier {
+
+        private final String dvFileName;
+
+        public IdentifierWithDv(Identifier identifier, String dvFileName) {
+            super(
+                    identifier.partition,
+                    identifier.bucket,
+                    identifier.level,
+                    identifier.fileName,
+                    identifier.extraFiles,
+                    identifier.embeddedIndex,
+                    identifier.externalPath);
+            this.dvFileName = dvFileName;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            if (!super.equals(o)) {
+                return false;
+            }
+            IdentifierWithDv that = (IdentifierWithDv) o;
+            return Objects.equals(dvFileName, that.dvFileName);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(super.hashCode(), dvFileName);
+        }
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 466df201a3..7df1589781 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -26,6 +26,7 @@ import org.apache.paimon.catalog.SnapshotCommit;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.index.IndexFileHandler;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.DataFilePathFactory;
 import org.apache.paimon.manifest.FileEntry;
@@ -61,7 +62,6 @@ import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.IOUtils;
 import org.apache.paimon.utils.InternalRowPartitionComputer;
 import org.apache.paimon.utils.Pair;
-import org.apache.paimon.utils.Preconditions;
 import org.apache.paimon.utils.SnapshotManager;
 
 import org.slf4j.Logger;
@@ -70,7 +70,6 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nullable;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
@@ -93,8 +92,11 @@ import static 
org.apache.paimon.manifest.ManifestEntry.recordCountAdd;
 import static org.apache.paimon.manifest.ManifestEntry.recordCountDelete;
 import static 
org.apache.paimon.partition.PartitionPredicate.createBinaryPartitions;
 import static 
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
+import static 
org.apache.paimon.utils.ConflictDeletionUtils.buildBaseEntriesWithDV;
+import static 
org.apache.paimon.utils.ConflictDeletionUtils.buildDeltaEntriesWithDV;
 import static 
org.apache.paimon.utils.InternalRowPartitionComputer.partToSimpleString;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
+import static org.apache.paimon.utils.Preconditions.checkState;
 
 /**
  * Default implementation of {@link FileStoreCommit}.
@@ -151,6 +153,9 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
     @Nullable private Long strictModeLastSafeSnapshot;
     private final InternalRowPartitionComputer partitionComputer;
     private final boolean rowTrackingEnabled;
+    private final boolean isPkTable;
+    private final boolean deletionVectorsEnabled;
+    private final IndexFileHandler indexFileHandler;
 
     private boolean ignoreEmptyCommit;
     private CommitMetrics commitMetrics;
@@ -187,7 +192,10 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
             long commitMinRetryWait,
             long commitMaxRetryWait,
             @Nullable Long strictModeLastSafeSnapshot,
-            boolean rowTrackingEnabled) {
+            boolean rowTrackingEnabled,
+            boolean isPkTable,
+            boolean deletionVectorsEnabled,
+            IndexFileHandler indexFileHandler) {
         this.snapshotCommit = snapshotCommit;
         this.fileIO = fileIO;
         this.schemaManager = schemaManager;
@@ -231,6 +239,9 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
         this.statsFileHandler = statsFileHandler;
         this.bucketMode = bucketMode;
         this.rowTrackingEnabled = rowTrackingEnabled;
+        this.isPkTable = isPkTable;
+        this.deletionVectorsEnabled = deletionVectorsEnabled;
+        this.indexFileHandler = indexFileHandler;
     }
 
     @Override
@@ -333,11 +344,16 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
                     // so we need to contain all changes
                     baseEntries.addAll(
                             readAllEntriesFromChangedPartitions(
-                                    latestSnapshot, appendTableFiles, 
compactTableFiles));
+                                    latestSnapshot,
+                                    changedPartitions(
+                                            appendTableFiles,
+                                            compactTableFiles,
+                                            appendIndexFiles)));
                     noConflictsOrFail(
-                            latestSnapshot.commitUser(),
+                            latestSnapshot,
                             baseEntries,
                             appendSimpleEntries,
+                            appendIndexFiles,
                             commitKind);
                     safeLatestSnapshotId = latestSnapshot.id();
                 }
@@ -370,9 +386,10 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
                 if (safeLatestSnapshotId != null) {
                     baseEntries.addAll(appendSimpleEntries);
                     noConflictsOrFail(
-                            latestSnapshot.commitUser(),
+                            latestSnapshot,
                             baseEntries,
                             SimpleFileEntry.from(compactTableFiles),
+                            compactIndexFiles,
                             CommitKind.COMPACT);
                     // assume this compact commit follows just after the 
append commit created above
                     safeLatestSnapshotId += 1;
@@ -1003,10 +1020,7 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
             // latestSnapshotId is different from the snapshot id we've 
checked for conflicts,
             // so we have to check again
             List<BinaryRow> changedPartitions =
-                    deltaFiles.stream()
-                            .map(ManifestEntry::partition)
-                            .distinct()
-                            .collect(Collectors.toList());
+                    changedPartitions(deltaFiles, Collections.emptyList(), 
indexFiles);
             if (retryResult != null && retryResult.latestSnapshot != null) {
                 baseDataFiles = new ArrayList<>(retryResult.baseDataFiles);
                 List<SimpleFileEntry> incremental =
@@ -1021,9 +1035,10 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
                         readAllEntriesFromChangedPartitions(latestSnapshot, 
changedPartitions);
             }
             noConflictsOrFail(
-                    latestSnapshot.commitUser(),
+                    latestSnapshot,
                     baseDataFiles,
                     SimpleFileEntry.from(deltaFiles),
+                    indexFiles,
                     commitKind);
         }
 
@@ -1351,16 +1366,23 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
         return entries;
     }
 
-    @SafeVarargs
-    private final List<SimpleFileEntry> readAllEntriesFromChangedPartitions(
-            Snapshot snapshot, List<ManifestEntry>... changes) {
-        List<BinaryRow> changedPartitions =
-                Arrays.stream(changes)
-                        .flatMap(Collection::stream)
-                        .map(ManifestEntry::partition)
-                        .distinct()
-                        .collect(Collectors.toList());
-        return readAllEntriesFromChangedPartitions(snapshot, 
changedPartitions);
+    private List<BinaryRow> changedPartitions(
+            List<ManifestEntry> appendTableFiles,
+            List<ManifestEntry> compactTableFiles,
+            List<IndexManifestEntry> appendIndexFiles) {
+        Set<BinaryRow> changedPartitions = new HashSet<>();
+        for (ManifestEntry appendTableFile : appendTableFiles) {
+            changedPartitions.add(appendTableFile.partition());
+        }
+        for (ManifestEntry compactTableFile : compactTableFiles) {
+            changedPartitions.add(compactTableFile.partition());
+        }
+        for (IndexManifestEntry appendIndexFile : appendIndexFiles) {
+            if 
(appendIndexFile.indexFile().indexType().equals(DELETION_VECTORS_INDEX)) {
+                changedPartitions.add(appendIndexFile.partition());
+            }
+        }
+        return new ArrayList<>(changedPartitions);
     }
 
     private List<SimpleFileEntry> readAllEntriesFromChangedPartitions(
@@ -1376,12 +1398,35 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
     }
 
     private void noConflictsOrFail(
-            String baseCommitUser,
+            Snapshot snapshot,
             List<SimpleFileEntry> baseEntries,
-            List<SimpleFileEntry> changes,
+            List<SimpleFileEntry> deltaEntries,
+            List<IndexManifestEntry> deltaIndexEntries,
             CommitKind commitKind) {
+        String baseCommitUser = snapshot.commitUser();
+        if (checkForDeletionVector(commitKind)) {
+            // Enrich dvName in fileEntry to checker for base ADD dv and delta 
DELETE dv.
+            // For example:
+            // If the base file is <ADD baseFile1, ADD dv1>,
+            // then the delta file must be <DELETE deltaFile1, DELETE dv1>; 
and vice versa,
+            // If the delta file is <DELETE deltaFile2, DELETE dv2>,
+            // then the base file must be <ADD baseFile2, ADD dv2>.
+            try {
+                baseEntries =
+                        buildBaseEntriesWithDV(
+                                baseEntries,
+                                snapshot.indexManifest() == null
+                                        ? Collections.emptyList()
+                                        : 
indexFileHandler.readManifest(snapshot.indexManifest()));
+                deltaEntries =
+                        buildDeltaEntriesWithDV(baseEntries, deltaEntries, 
deltaIndexEntries);
+            } catch (Throwable e) {
+                throw conflictException(commitUser, baseEntries, 
deltaEntries).apply(e);
+            }
+        }
+
         List<SimpleFileEntry> allEntries = new ArrayList<>(baseEntries);
-        allEntries.addAll(changes);
+        allEntries.addAll(deltaEntries);
 
         if (commitKind != CommitKind.OVERWRITE) {
             // total buckets within the same partition should remain the same
@@ -1412,37 +1457,22 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
                                         + " without overwrite. Give up 
committing.",
                                 baseCommitUser,
                                 baseEntries,
-                                changes,
+                                deltaEntries,
                                 null);
                 LOG.warn("", conflictException.getLeft());
                 throw conflictException.getRight();
             }
         }
 
-        Function<Throwable, RuntimeException> exceptionFunction =
-                e -> {
-                    Pair<RuntimeException, RuntimeException> conflictException 
=
-                            createConflictException(
-                                    "File deletion conflicts detected! Give up 
committing.",
-                                    baseCommitUser,
-                                    baseEntries,
-                                    changes,
-                                    e);
-                    LOG.warn("", conflictException.getLeft());
-                    return conflictException.getRight();
-                };
-
         Collection<SimpleFileEntry> mergedEntries;
         try {
             // merge manifest entries and also check if the files we want to 
delete are still there
             mergedEntries = FileEntry.mergeEntries(allEntries);
         } catch (Throwable e) {
-            throw exceptionFunction.apply(e);
+            throw conflictException(commitUser, baseEntries, 
deltaEntries).apply(e);
         }
 
-        assertNoDelete(mergedEntries, exceptionFunction);
-
-        // TODO check for deletion vectors
+        assertNoDelete(mergedEntries, conflictException(commitUser, 
baseEntries, deltaEntries));
 
         // fast exit for file store without keys
         if (keyComparator == null) {
@@ -1476,7 +1506,7 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
                                             + 
b.identifier().toString(pathFactory),
                                     baseCommitUser,
                                     baseEntries,
-                                    changes,
+                                    deltaEntries,
                                     null);
 
                     LOG.warn("", conflictException.getLeft());
@@ -1486,12 +1516,48 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
         }
     }
 
+    private Function<Throwable, RuntimeException> conflictException(
+            String baseCommitUser,
+            List<SimpleFileEntry> baseEntries,
+            List<SimpleFileEntry> deltaEntries) {
+        return e -> {
+            Pair<RuntimeException, RuntimeException> conflictException =
+                    createConflictException(
+                            "File deletion conflicts detected! Give up 
committing.",
+                            baseCommitUser,
+                            baseEntries,
+                            deltaEntries,
+                            e);
+            LOG.warn("", conflictException.getLeft());
+            return conflictException.getRight();
+        };
+    }
+
+    private boolean checkForDeletionVector(CommitKind commitKind) {
+        if (!deletionVectorsEnabled) {
+            return false;
+        }
+
+        // todo: Add them once contains DELETE type.
+        // PK table's compact dv index only contains ADD type, skip conflict 
detection.
+        if (isPkTable && commitKind == CommitKind.COMPACT) {
+            return false;
+        }
+
+        // Non-PK table's hash fixed bucket mode only contains ADD type, skip 
conflict detection.
+        if (!isPkTable && bucketMode.equals(BucketMode.HASH_FIXED)) {
+            return false;
+        }
+
+        return true;
+    }
+
     private void assertNoDelete(
             Collection<SimpleFileEntry> mergedEntries,
             Function<Throwable, RuntimeException> exceptionFunction) {
         try {
             for (SimpleFileEntry entry : mergedEntries) {
-                Preconditions.checkState(
+                checkState(
                         entry.kind() != FileKind.DELETE,
                         "Trying to delete file %s for table %s which is not 
previously added.",
                         entry.fileName(),
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/ConflictDeletionUtils.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/ConflictDeletionUtils.java
new file mode 100644
index 0000000000..00942ea048
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/utils/ConflictDeletionUtils.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.index.DeletionVectorMeta;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.manifest.IndexManifestEntry;
+import org.apache.paimon.manifest.SimpleFileEntry;
+import org.apache.paimon.manifest.SimpleFileEntryWithDV;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.paimon.utils.Preconditions.checkState;
+
+/** Utils for conflict deletion. */
+public class ConflictDeletionUtils {
+
+    public static List<SimpleFileEntry> buildBaseEntriesWithDV(
+            List<SimpleFileEntry> baseEntries, List<IndexManifestEntry> 
baseIndexEntries) {
+        if (baseEntries.isEmpty()) {
+            return Collections.emptyList();
+        }
+
+        Map<String, String> fileNameToDVFileName = new HashMap<>();
+        for (IndexManifestEntry indexManifestEntry : baseIndexEntries) {
+            // Should not attach DELETE type dv index for base file.
+            if (!indexManifestEntry.kind().equals(FileKind.DELETE)) {
+                IndexFileMeta indexFile = indexManifestEntry.indexFile();
+                if (indexFile.dvRanges() != null) {
+                    for (DeletionVectorMeta value : 
indexFile.dvRanges().values()) {
+                        checkState(
+                                
!fileNameToDVFileName.containsKey(value.dataFileName()),
+                                "One file should correspond to only one dv 
entry.");
+                        fileNameToDVFileName.put(value.dataFileName(), 
indexFile.fileName());
+                    }
+                }
+            }
+        }
+
+        // Attach dv name to file entries.
+        List<SimpleFileEntry> entriesWithDV = new 
ArrayList<>(baseEntries.size());
+        for (SimpleFileEntry fileEntry : baseEntries) {
+            entriesWithDV.add(
+                    new SimpleFileEntryWithDV(
+                            fileEntry, 
fileNameToDVFileName.get(fileEntry.fileName())));
+        }
+        return entriesWithDV;
+    }
+
+    public static List<SimpleFileEntry> buildDeltaEntriesWithDV(
+            List<SimpleFileEntry> baseEntries,
+            List<SimpleFileEntry> deltaEntries,
+            List<IndexManifestEntry> deltaIndexEntries) {
+        if (deltaEntries.isEmpty() && deltaIndexEntries.isEmpty()) {
+            return Collections.emptyList();
+        }
+
+        List<SimpleFileEntry> entriesWithDV = new 
ArrayList<>(deltaEntries.size());
+
+        // One file may correspond to more than one dv entries, for example, 
delete the old dv, and
+        // create a new one.
+        Map<String, List<IndexManifestEntry>> fileNameToDVEntry = new 
HashMap<>();
+        for (IndexManifestEntry deltaIndexEntry : deltaIndexEntries) {
+            if (deltaIndexEntry.indexFile().dvRanges() != null) {
+                for (DeletionVectorMeta meta : 
deltaIndexEntry.indexFile().dvRanges().values()) {
+                    fileNameToDVEntry.putIfAbsent(meta.dataFileName(), new 
ArrayList<>());
+                    
fileNameToDVEntry.get(meta.dataFileName()).add(deltaIndexEntry);
+                }
+            }
+        }
+
+        Set<String> fileNotInDeltaEntries = new 
HashSet<>(fileNameToDVEntry.keySet());
+        // 1. Attach dv name to delta file entries.
+        for (SimpleFileEntry fileEntry : deltaEntries) {
+            if (fileNameToDVEntry.containsKey(fileEntry.fileName())) {
+                List<IndexManifestEntry> dvs = 
fileNameToDVEntry.get(fileEntry.fileName());
+                checkState(dvs.size() == 1, "Delta entry only can have one dv 
file");
+                entriesWithDV.add(
+                        new SimpleFileEntryWithDV(fileEntry, 
dvs.get(0).indexFile().fileName()));
+                fileNotInDeltaEntries.remove(fileEntry.fileName());
+            } else {
+                entriesWithDV.add(new SimpleFileEntryWithDV(fileEntry, null));
+            }
+        }
+
+        // 2. For file not in delta entries, build entry with dv with 
baseEntries.
+        if (!fileNotInDeltaEntries.isEmpty()) {
+            Map<String, SimpleFileEntry> fileNameToFileEntry = new HashMap<>();
+            for (SimpleFileEntry baseEntry : baseEntries) {
+                if (baseEntry.kind().equals(FileKind.ADD)) {
+                    fileNameToFileEntry.put(baseEntry.fileName(), baseEntry);
+                }
+            }
+
+            for (String fileName : fileNotInDeltaEntries) {
+                SimpleFileEntryWithDV simpleFileEntry =
+                        (SimpleFileEntryWithDV) 
fileNameToFileEntry.get(fileName);
+                checkState(
+                        simpleFileEntry != null,
+                        String.format(
+                                "Trying to create deletion vector on file %s 
which is not previously added.",
+                                fileName));
+                List<IndexManifestEntry> dvEntries = 
fileNameToDVEntry.get(fileName);
+                // If dv entry's type id DELETE, add DELETE<f, dv>
+                // If dv entry's type id ADD, add ADD<f, dv>
+                for (IndexManifestEntry dvEntry : dvEntries) {
+                    entriesWithDV.add(
+                            new SimpleFileEntryWithDV(
+                                    dvEntry.kind().equals(FileKind.ADD)
+                                            ? simpleFileEntry
+                                            : simpleFileEntry.toDelete(),
+                                    dvEntry.indexFile().fileName()));
+                }
+
+                // If one file correspond to only one dv entry and the type is 
ADD,
+                // we need to add a DELETE<f, null>.
+                // This happens when create a dv for a file that doesn't have 
dv before.
+                if (dvEntries.size() == 1 && 
dvEntries.get(0).kind().equals(FileKind.ADD)) {
+                    entriesWithDV.add(new 
SimpleFileEntryWithDV(simpleFileEntry.toDelete(), null));
+                }
+            }
+        }
+
+        return entriesWithDV;
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java 
b/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java
index 3ca33a8222..2754022dd7 100644
--- a/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java
+++ b/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java
@@ -29,6 +29,7 @@ import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.index.IndexFileHandler;
 import org.apache.paimon.index.IndexFileMeta;
 import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.DataIncrement;
 import org.apache.paimon.manifest.ManifestCommittable;
 import org.apache.paimon.operation.FileStoreCommitImpl;
@@ -36,6 +37,7 @@ import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.SchemaUtils;
 import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.stats.SimpleStats;
 import org.apache.paimon.table.CatalogEnvironment;
 import org.apache.paimon.table.sink.CommitMessage;
 import org.apache.paimon.table.sink.CommitMessageImpl;
@@ -43,6 +45,7 @@ import org.apache.paimon.table.source.DeletionFile;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.TraceableFileIO;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -142,6 +145,38 @@ public class TestAppendFileStore extends 
AppendOnlyFileStore {
         return factory.create(partition, bucket, indexFiles);
     }
 
+    public CommitMessageImpl writeDataFiles(
+            BinaryRow partition, int bucket, List<String> dataFileNames) 
throws IOException {
+        List<DataFileMeta> fileMetas = new ArrayList<>();
+        Path bucketPath = pathFactory().bucketPath(partition, bucket);
+        for (String dataFileName : dataFileNames) {
+            Path path = new Path(bucketPath, dataFileName);
+            fileIO.newOutputStream(path, false).close();
+            fileMetas.add(
+                    DataFileMeta.forAppend(
+                            path.getName(),
+                            10L,
+                            10L,
+                            SimpleStats.EMPTY_STATS,
+                            0L,
+                            0L,
+                            schema.id(),
+                            Collections.emptyList(),
+                            null,
+                            null,
+                            null,
+                            null,
+                            null,
+                            null));
+        }
+        return new CommitMessageImpl(
+                partition,
+                bucket,
+                options().bucket(),
+                new DataIncrement(fileMetas, Collections.emptyList(), 
Collections.emptyList()),
+                CompactIncrement.emptyIncrement());
+    }
+
     public CommitMessageImpl writeDVIndexFiles(
             BinaryRow partition, int bucket, Map<String, List<Integer>> 
dataFileToPositions) {
         BucketedDvMaintainer dvMaintainer = 
createOrRestoreDVMaintainer(partition, bucket);
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/BucketedDvMaintainerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/BucketedDvMaintainerTest.java
index 10c6f45200..018d55e933 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/BucketedDvMaintainerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/BucketedDvMaintainerTest.java
@@ -23,16 +23,20 @@ import org.apache.paimon.Snapshot;
 import org.apache.paimon.catalog.PrimaryKeyTableTestBase;
 import org.apache.paimon.compact.CompactDeletionFile;
 import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.fs.Path;
 import org.apache.paimon.index.IndexFileHandler;
 import org.apache.paimon.index.IndexFileMeta;
 import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.DataIncrement;
 import org.apache.paimon.manifest.IndexManifestEntry;
+import org.apache.paimon.stats.SimpleStats;
 import org.apache.paimon.table.sink.BatchTableCommit;
 import org.apache.paimon.table.sink.CommitMessage;
 import org.apache.paimon.table.sink.CommitMessageImpl;
 import org.apache.paimon.utils.FileIOUtils;
 
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 
@@ -40,6 +44,8 @@ import javax.annotation.Nullable;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -47,13 +53,22 @@ import java.util.Map;
 import java.util.stream.Collectors;
 
 import static java.util.Collections.emptyList;
-import static org.apache.paimon.data.BinaryRow.EMPTY_ROW;
 import static 
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test for {@link BucketedDvMaintainer}. */
 public class BucketedDvMaintainerTest extends PrimaryKeyTableTestBase {
     private IndexFileHandler fileHandler;
+    private final BinaryRow partition = BinaryRow.singleColumn(1);
+
+    @BeforeEach
+    public void setUp() throws Exception {
+        // write files
+        CommitMessageImpl commitMessage =
+                writeDataFiles(partition, 0, Arrays.asList("f1", "f2", "f3"));
+        BatchTableCommit commit = table.newBatchWriteBuilder().newCommit();
+        commit.commit(Collections.singletonList(commitMessage));
+    }
 
     @ParameterizedTest
     @ValueSource(booleans = {true, false})
@@ -61,7 +76,7 @@ public class BucketedDvMaintainerTest extends 
PrimaryKeyTableTestBase {
         initIndexHandler(bitmap64);
 
         BucketedDvMaintainer.Factory factory = 
BucketedDvMaintainer.factory(fileHandler);
-        BucketedDvMaintainer dvMaintainer = factory.create(EMPTY_ROW, 0, 
emptyList());
+        BucketedDvMaintainer dvMaintainer = factory.create(partition, 0, 
emptyList());
         assertThat(dvMaintainer.bitmap64).isEqualTo(bitmap64);
 
         dvMaintainer.notifyNewDeletion("f1", 1);
@@ -74,7 +89,7 @@ public class BucketedDvMaintainerTest extends 
PrimaryKeyTableTestBase {
         IndexFileMeta file = dvMaintainer.writeDeletionVectorsIndex().get();
 
         Map<String, DeletionVector> deletionVectors =
-                fileHandler.readAllDeletionVectors(EMPTY_ROW, 0, 
Collections.singletonList(file));
+                fileHandler.readAllDeletionVectors(partition, 0, 
Collections.singletonList(file));
         assertThat(deletionVectors.get("f1").isDeleted(1)).isTrue();
         assertThat(deletionVectors.get("f1").isDeleted(2)).isFalse();
         assertThat(deletionVectors.get("f2").isDeleted(1)).isFalse();
@@ -89,7 +104,7 @@ public class BucketedDvMaintainerTest extends 
PrimaryKeyTableTestBase {
 
         BucketedDvMaintainer.Factory factory = 
BucketedDvMaintainer.factory(fileHandler);
 
-        BucketedDvMaintainer dvMaintainer = factory.create(EMPTY_ROW, 0, new 
HashMap<>());
+        BucketedDvMaintainer dvMaintainer = factory.create(partition, 0, new 
HashMap<>());
         DeletionVector deletionVector1 = createDeletionVector(bitmap64);
         deletionVector1.delete(1);
         deletionVector1.delete(3);
@@ -100,7 +115,7 @@ public class BucketedDvMaintainerTest extends 
PrimaryKeyTableTestBase {
         IndexFileMeta file = dvMaintainer.writeDeletionVectorsIndex().get();
         CommitMessage commitMessage =
                 new CommitMessageImpl(
-                        EMPTY_ROW,
+                        partition,
                         0,
                         1,
                         new DataIncrement(
@@ -115,8 +130,8 @@ public class BucketedDvMaintainerTest extends 
PrimaryKeyTableTestBase {
 
         Snapshot latestSnapshot = table.snapshotManager().latestSnapshot();
         List<IndexFileMeta> indexFiles =
-                fileHandler.scan(latestSnapshot, DELETION_VECTORS_INDEX, 
EMPTY_ROW, 0);
-        dvMaintainer = factory.create(EMPTY_ROW, 0, indexFiles);
+                fileHandler.scan(latestSnapshot, DELETION_VECTORS_INDEX, 
partition, 0);
+        dvMaintainer = factory.create(partition, 0, indexFiles);
         DeletionVector deletionVector2 = 
dvMaintainer.deletionVectorOf("f1").get();
         assertThat(deletionVector2.isDeleted(1)).isTrue();
         assertThat(deletionVector2.isDeleted(2)).isFalse();
@@ -127,7 +142,7 @@ public class BucketedDvMaintainerTest extends 
PrimaryKeyTableTestBase {
         file = dvMaintainer.writeDeletionVectorsIndex().get();
         commitMessage =
                 new CommitMessageImpl(
-                        EMPTY_ROW,
+                        partition,
                         0,
                         1,
                         new DataIncrement(
@@ -141,8 +156,8 @@ public class BucketedDvMaintainerTest extends 
PrimaryKeyTableTestBase {
         commit.commit(Collections.singletonList(commitMessage));
 
         latestSnapshot = table.snapshotManager().latestSnapshot();
-        indexFiles = fileHandler.scan(latestSnapshot, DELETION_VECTORS_INDEX, 
EMPTY_ROW, 0);
-        dvMaintainer = factory.create(EMPTY_ROW, 0, indexFiles);
+        indexFiles = fileHandler.scan(latestSnapshot, DELETION_VECTORS_INDEX, 
partition, 0);
+        dvMaintainer = factory.create(partition, 0, indexFiles);
         DeletionVector deletionVector3 = 
dvMaintainer.deletionVectorOf("f1").get();
         assertThat(deletionVector3.isDeleted(1)).isTrue();
         assertThat(deletionVector3.isDeleted(2)).isTrue();
@@ -154,7 +169,7 @@ public class BucketedDvMaintainerTest extends 
PrimaryKeyTableTestBase {
         initIndexHandler(bitmap64);
 
         BucketedDvMaintainer.Factory factory = 
BucketedDvMaintainer.factory(fileHandler);
-        BucketedDvMaintainer dvMaintainer = factory.create(EMPTY_ROW, 0, 
emptyList());
+        BucketedDvMaintainer dvMaintainer = factory.create(partition, 0, 
emptyList());
 
         File indexDir = new File(tempPath.toFile(), "/default.db/T/index");
 
@@ -195,7 +210,7 @@ public class BucketedDvMaintainerTest extends 
PrimaryKeyTableTestBase {
         // write first kind dv
         initIndexHandler(bitmap64);
         BucketedDvMaintainer.Factory factory1 = 
BucketedDvMaintainer.factory(fileHandler);
-        BucketedDvMaintainer dvMaintainer1 = factory1.create(EMPTY_ROW, 0, new 
HashMap<>());
+        BucketedDvMaintainer dvMaintainer1 = factory1.create(partition, 0, new 
HashMap<>());
         dvMaintainer1.notifyNewDeletion("f1", 1);
         dvMaintainer1.notifyNewDeletion("f1", 3);
         dvMaintainer1.notifyNewDeletion("f2", 1);
@@ -205,7 +220,7 @@ public class BucketedDvMaintainerTest extends 
PrimaryKeyTableTestBase {
         IndexFileMeta file = dvMaintainer1.writeDeletionVectorsIndex().get();
         CommitMessage commitMessage1 =
                 new CommitMessageImpl(
-                        EMPTY_ROW,
+                        partition,
                         0,
                         1,
                         DataIncrement.emptyIncrement(),
@@ -223,8 +238,8 @@ public class BucketedDvMaintainerTest extends 
PrimaryKeyTableTestBase {
         BucketedDvMaintainer.Factory factory2 = 
BucketedDvMaintainer.factory(fileHandler);
         List<IndexFileMeta> indexFiles =
                 fileHandler.scan(
-                        table.latestSnapshot().get(), DELETION_VECTORS_INDEX, 
EMPTY_ROW, 0);
-        BucketedDvMaintainer dvMaintainer2 = factory2.create(EMPTY_ROW, 0, 
indexFiles);
+                        table.latestSnapshot().get(), DELETION_VECTORS_INDEX, 
partition, 0);
+        BucketedDvMaintainer dvMaintainer2 = factory2.create(partition, 0, 
indexFiles);
         dvMaintainer2.notifyNewDeletion("f1", 10);
         dvMaintainer2.notifyNewDeletion("f3", 1);
         dvMaintainer2.notifyNewDeletion("f3", 3);
@@ -242,7 +257,7 @@ public class BucketedDvMaintainerTest extends 
PrimaryKeyTableTestBase {
         file = dvMaintainer2.writeDeletionVectorsIndex().get();
         CommitMessage commitMessage2 =
                 new CommitMessageImpl(
-                        EMPTY_ROW,
+                        partition,
                         0,
                         1,
                         DataIncrement.emptyIncrement(),
@@ -258,10 +273,10 @@ public class BucketedDvMaintainerTest extends 
PrimaryKeyTableTestBase {
         // test read dv index file which contains two kinds of dv
         Map<String, DeletionVector> readDvs =
                 fileHandler.readAllDeletionVectors(
-                        EMPTY_ROW,
+                        partition,
                         0,
                         fileHandler.scan(
-                                table.latestSnapshot().get(), 
"DELETION_VECTORS", EMPTY_ROW, 0));
+                                table.latestSnapshot().get(), 
"DELETION_VECTORS", partition, 0));
         assertThat(readDvs.size()).isEqualTo(3);
         assertThat(dvs.get("f1").getCardinality()).isEqualTo(3);
         assertThat(dvs.get("f2").getCardinality()).isEqualTo(2);
@@ -293,7 +308,39 @@ public class BucketedDvMaintainerTest extends 
PrimaryKeyTableTestBase {
                                 .map(IndexManifestEntry::indexFile)
                                 .collect(Collectors.toList());
         Map<String, DeletionVector> deletionVectors =
-                new HashMap<>(handler.readAllDeletionVectors(EMPTY_ROW, 0, 
indexFiles));
-        return factory.create(EMPTY_ROW, 0, deletionVectors);
+                new HashMap<>(handler.readAllDeletionVectors(partition, 0, 
indexFiles));
+        return factory.create(partition, 0, deletionVectors);
+    }
+
+    private CommitMessageImpl writeDataFiles(
+            BinaryRow partition, int bucket, List<String> dataFileNames) 
throws IOException {
+        List<DataFileMeta> fileMetas = new ArrayList<>();
+        Path bucketPath = table.store().pathFactory().bucketPath(partition, 
bucket);
+        for (String dataFileName : dataFileNames) {
+            Path path = new Path(bucketPath, dataFileName);
+            table.fileIO().newOutputStream(path, false).close();
+            fileMetas.add(
+                    DataFileMeta.forAppend(
+                            path.getName(),
+                            10L,
+                            10L,
+                            SimpleStats.EMPTY_STATS,
+                            0L,
+                            0L,
+                            table.schema().id(),
+                            Collections.emptyList(),
+                            null,
+                            null,
+                            null,
+                            null,
+                            null,
+                            null));
+        }
+        return new CommitMessageImpl(
+                partition,
+                bucket,
+                null,
+                new DataIncrement(fileMetas, Collections.emptyList(), 
Collections.emptyList()),
+                CompactIncrement.emptyIncrement());
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
index 35843e0c2f..505417a412 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
@@ -886,23 +886,25 @@ public class FileStoreCommitTest {
         Map<String, String> options = new HashMap<>();
         options.put(CoreOptions.DELETION_VECTOR_BITMAP64.key(), 
String.valueOf(bitmap64));
         TestAppendFileStore store = 
TestAppendFileStore.createAppendStore(tempDir, options);
+        BinaryRow partition = gen.getPartition(gen.next());
+
+        // create files
+        CommitMessageImpl commitMessage0 =
+                store.writeDataFiles(partition, 0, Arrays.asList("f1", "f2"));
+        store.commit(commitMessage0);
 
         // commit 1
         CommitMessageImpl commitMessage1 =
                 store.writeDVIndexFiles(
-                        BinaryRow.EMPTY_ROW,
-                        0,
-                        Collections.singletonMap("f1", Arrays.asList(1, 3)));
+                        partition, 0, Collections.singletonMap("f1", 
Arrays.asList(1, 3)));
         CommitMessageImpl commitMessage2 =
                 store.writeDVIndexFiles(
-                        BinaryRow.EMPTY_ROW,
-                        0,
-                        Collections.singletonMap("f2", Arrays.asList(2, 4)));
+                        partition, 0, Collections.singletonMap("f2", 
Arrays.asList(2, 4)));
         store.commit(commitMessage1, commitMessage2);
 
         // assert 1
-        assertThat(store.scanDVIndexFiles(BinaryRow.EMPTY_ROW, 
0).size()).isEqualTo(2);
-        BucketedDvMaintainer maintainer = 
store.createOrRestoreDVMaintainer(BinaryRow.EMPTY_ROW, 0);
+        assertThat(store.scanDVIndexFiles(partition, 0).size()).isEqualTo(2);
+        BucketedDvMaintainer maintainer = 
store.createOrRestoreDVMaintainer(partition, 0);
         Map<String, DeletionVector> dvs = maintainer.deletionVectors();
         assertThat(dvs.size()).isEqualTo(2);
         assertThat(dvs.get("f2").isDeleted(2)).isTrue();
@@ -912,16 +914,16 @@ public class FileStoreCommitTest {
         // commit 2
         CommitMessage commitMessage3 =
                 store.writeDVIndexFiles(
-                        BinaryRow.EMPTY_ROW, 0, Collections.singletonMap("f2", 
Arrays.asList(3)));
+                        partition, 0, Collections.singletonMap("f2", 
Arrays.asList(3)));
         List<IndexFileMeta> deleted =
                 new 
ArrayList<>(commitMessage1.newFilesIncrement().newIndexFiles());
         deleted.addAll(commitMessage2.newFilesIncrement().newIndexFiles());
-        CommitMessage commitMessage4 = 
store.removeIndexFiles(BinaryRow.EMPTY_ROW, 0, deleted);
+        CommitMessage commitMessage4 = store.removeIndexFiles(partition, 0, 
deleted);
         store.commit(commitMessage3, commitMessage4);
 
         // assert 2
-        assertThat(store.scanDVIndexFiles(BinaryRow.EMPTY_ROW, 
0).size()).isEqualTo(1);
-        maintainer = store.createOrRestoreDVMaintainer(BinaryRow.EMPTY_ROW, 0);
+        assertThat(store.scanDVIndexFiles(partition, 0).size()).isEqualTo(1);
+        maintainer = store.createOrRestoreDVMaintainer(partition, 0);
         dvs = maintainer.deletionVectors();
         assertThat(dvs.size()).isEqualTo(2);
         assertThat(dvs.get("f1").isDeleted(3)).isTrue();
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/utils/ConflictDeletionUtilsTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/utils/ConflictDeletionUtilsTest.java
new file mode 100644
index 0000000000..34e6e59e7c
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/utils/ConflictDeletionUtilsTest.java
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.index.DeletionVectorMeta;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.manifest.FileEntry;
+import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.manifest.IndexManifestEntry;
+import org.apache.paimon.manifest.SimpleFileEntry;
+import org.apache.paimon.manifest.SimpleFileEntryWithDV;
+
+import org.junit.jupiter.api.Test;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+
+import static org.apache.paimon.data.BinaryRow.EMPTY_ROW;
+import static 
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
+import static org.apache.paimon.manifest.FileKind.ADD;
+import static org.apache.paimon.manifest.FileKind.DELETE;
+import static 
org.apache.paimon.utils.ConflictDeletionUtils.buildBaseEntriesWithDV;
+import static 
org.apache.paimon.utils.ConflictDeletionUtils.buildDeltaEntriesWithDV;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link ConflictDeletionUtils}. */
+public class ConflictDeletionUtilsTest {
+
+    @Test
+    public void testBuildBaseEntriesWithDV() {
+        {
+            // Scene 1
+            List<SimpleFileEntry> baseEntries = new ArrayList<>();
+            baseEntries.add(createFileEntry("f1", ADD));
+            baseEntries.add(createFileEntry("f2", ADD));
+
+            List<IndexManifestEntry> deltaIndexEntries = new ArrayList<>();
+            deltaIndexEntries.add(createDvIndexEntry("dv1", ADD, 
Arrays.asList("f2")));
+
+            assertThat(buildBaseEntriesWithDV(baseEntries, deltaIndexEntries))
+                    .containsExactlyInAnyOrder(
+                            createFileEntryWithDV("f1", ADD, null),
+                            createFileEntryWithDV("f2", ADD, "dv1"));
+        }
+
+        {
+            // Scene 2: skip delete dv
+            List<SimpleFileEntry> baseEntries = new ArrayList<>();
+            baseEntries.add(createFileEntry("f1", ADD));
+            baseEntries.add(createFileEntry("f2", ADD));
+
+            List<IndexManifestEntry> deltaIndexEntries = new ArrayList<>();
+            deltaIndexEntries.add(createDvIndexEntry("dv1", DELETE, 
Arrays.asList("f2")));
+
+            assertThat(buildBaseEntriesWithDV(baseEntries, deltaIndexEntries))
+                    .containsExactlyInAnyOrder(
+                            createFileEntryWithDV("f1", ADD, null),
+                            createFileEntryWithDV("f2", ADD, null));
+        }
+    }
+
+    @Test
+    public void testBuildDeltaEntriesWithDV() {
+        {
+            // Scene 1: update f2's dv
+            List<SimpleFileEntry> baseEntries = new ArrayList<>();
+            baseEntries.add(createFileEntryWithDV("f1", ADD, "dv1"));
+            baseEntries.add(createFileEntryWithDV("f2", ADD, null));
+
+            List<SimpleFileEntry> deltaEntries = new ArrayList<>();
+            deltaEntries.add(createFileEntry("f2", DELETE));
+            deltaEntries.add(createFileEntry("f2_new", ADD));
+
+            List<IndexManifestEntry> deltaIndexEntries = new ArrayList<>();
+            deltaIndexEntries.add(createDvIndexEntry("dv2", ADD, 
Arrays.asList("f2_new")));
+
+            assertThat(buildDeltaEntriesWithDV(baseEntries, deltaEntries, 
deltaIndexEntries))
+                    .containsExactlyInAnyOrder(
+                            createFileEntryWithDV("f2", DELETE, null),
+                            createFileEntryWithDV("f2_new", ADD, "dv2"));
+        }
+
+        {
+            // Scene 2: update f2 and merge f1's dv
+            List<SimpleFileEntry> baseEntries = new ArrayList<>();
+            baseEntries.add(createFileEntryWithDV("f1", ADD, "dv1"));
+            baseEntries.add(createFileEntryWithDV("f2", ADD, null));
+
+            List<SimpleFileEntry> deltaEntries = new ArrayList<>();
+            deltaEntries.add(createFileEntry("f2", DELETE));
+            deltaEntries.add(createFileEntry("f2_new", ADD));
+            deltaEntries.add(createFileEntry("f3", ADD));
+
+            List<IndexManifestEntry> deltaIndexEntries = new ArrayList<>();
+            deltaIndexEntries.add(createDvIndexEntry("dv1", DELETE, 
Arrays.asList("f1")));
+            deltaIndexEntries.add(createDvIndexEntry("dv2", ADD, 
Arrays.asList("f1", "f2_new")));
+
+            assertThat(buildDeltaEntriesWithDV(baseEntries, deltaEntries, 
deltaIndexEntries))
+                    .containsExactlyInAnyOrder(
+                            createFileEntryWithDV("f1", DELETE, "dv1"),
+                            createFileEntryWithDV("f1", ADD, "dv2"),
+                            createFileEntryWithDV("f2", DELETE, null),
+                            createFileEntryWithDV("f2_new", ADD, "dv2"),
+                            createFileEntryWithDV("f3", ADD, null));
+        }
+
+        {
+            // Scene 3: update f2 (with dv) and merge f1's dv
+            List<SimpleFileEntry> baseEntries = new ArrayList<>();
+            baseEntries.add(createFileEntryWithDV("f1", ADD, "dv1"));
+            baseEntries.add(createFileEntryWithDV("f2", ADD, "dv2"));
+
+            List<SimpleFileEntry> deltaEntries = new ArrayList<>();
+            deltaEntries.add(createFileEntry("f2", DELETE));
+            deltaEntries.add(createFileEntry("f2_new", ADD));
+            deltaEntries.add(createFileEntry("f3", ADD));
+
+            List<IndexManifestEntry> deltaIndexEntries = new ArrayList<>();
+            deltaIndexEntries.add(createDvIndexEntry("dv1", DELETE, 
Arrays.asList("f1")));
+            deltaIndexEntries.add(createDvIndexEntry("dv2", DELETE, 
Arrays.asList("f2")));
+            deltaIndexEntries.add(createDvIndexEntry("dv3", ADD, 
Arrays.asList("f1", "f2_new")));
+
+            assertThat(buildDeltaEntriesWithDV(baseEntries, deltaEntries, 
deltaIndexEntries))
+                    .containsExactlyInAnyOrder(
+                            createFileEntryWithDV("f1", DELETE, "dv1"),
+                            createFileEntryWithDV("f1", ADD, "dv3"),
+                            createFileEntryWithDV("f2", DELETE, "dv2"),
+                            createFileEntryWithDV("f2_new", ADD, "dv3"),
+                            createFileEntryWithDV("f3", ADD, null));
+        }
+
+        {
+            // Scene 4: full compact
+            List<SimpleFileEntry> baseEntries = new ArrayList<>();
+            baseEntries.add(createFileEntryWithDV("f1", ADD, null));
+            baseEntries.add(createFileEntryWithDV("f2", ADD, "dv1"));
+            baseEntries.add(createFileEntryWithDV("f3", ADD, "dv1"));
+            baseEntries.add(createFileEntryWithDV("f4", ADD, "dv2"));
+
+            List<SimpleFileEntry> deltaEntries = new ArrayList<>();
+            deltaEntries.add(createFileEntry("f1", DELETE));
+            deltaEntries.add(createFileEntry("f2", DELETE));
+            deltaEntries.add(createFileEntry("f3", DELETE));
+            deltaEntries.add(createFileEntry("f4", DELETE));
+            deltaEntries.add(createFileEntry("f5_compact", ADD));
+
+            List<IndexManifestEntry> deltaIndexEntries = new ArrayList<>();
+            deltaIndexEntries.add(createDvIndexEntry("dv1", DELETE, 
Arrays.asList("f2", "f3")));
+            deltaIndexEntries.add(createDvIndexEntry("dv2", DELETE, 
Arrays.asList("f4")));
+
+            assertThat(buildDeltaEntriesWithDV(baseEntries, deltaEntries, 
deltaIndexEntries))
+                    .containsExactlyInAnyOrder(
+                            createFileEntryWithDV("f1", DELETE, null),
+                            createFileEntryWithDV("f2", DELETE, "dv1"),
+                            createFileEntryWithDV("f3", DELETE, "dv1"),
+                            createFileEntryWithDV("f4", DELETE, "dv2"),
+                            createFileEntryWithDV("f5_compact", ADD, null));
+        }
+
+        {
+            // Scene 5: merge into with update, delete and insert
+            List<SimpleFileEntry> baseEntries = new ArrayList<>();
+            baseEntries.add(createFileEntryWithDV("f1", ADD, null));
+            baseEntries.add(createFileEntryWithDV("f2", ADD, null));
+            baseEntries.add(createFileEntryWithDV("f3", ADD, "dv1"));
+            baseEntries.add(createFileEntryWithDV("f4", ADD, "dv1"));
+            baseEntries.add(createFileEntryWithDV("f5", ADD, "dv2"));
+
+            List<SimpleFileEntry> deltaEntries = new ArrayList<>();
+            deltaEntries.add(createFileEntry("f2", DELETE));
+            deltaEntries.add(createFileEntry("f3", DELETE));
+            deltaEntries.add(createFileEntry("f3_new", ADD));
+            deltaEntries.add(createFileEntry("f7", ADD));
+
+            List<IndexManifestEntry> deltaIndexEntries = new ArrayList<>();
+            deltaIndexEntries.add(createDvIndexEntry("dv1", DELETE, 
Arrays.asList("f3", "f4")));
+            deltaIndexEntries.add(createDvIndexEntry("dv2", DELETE, 
Arrays.asList("f5")));
+            deltaIndexEntries.add(createDvIndexEntry("dv3", ADD, 
Arrays.asList("f1", "f4", "f5")));
+
+            assertThat(buildDeltaEntriesWithDV(baseEntries, deltaEntries, 
deltaIndexEntries))
+                    .containsExactlyInAnyOrder(
+                            createFileEntryWithDV("f1", DELETE, null),
+                            createFileEntryWithDV("f1", ADD, "dv3"),
+                            createFileEntryWithDV("f2", DELETE, null),
+                            createFileEntryWithDV("f3", DELETE, "dv1"),
+                            createFileEntryWithDV("f3_new", ADD, null),
+                            createFileEntryWithDV("f4", DELETE, "dv1"),
+                            createFileEntryWithDV("f4", ADD, "dv3"),
+                            createFileEntryWithDV("f5", DELETE, "dv2"),
+                            createFileEntryWithDV("f5", ADD, "dv3"),
+                            createFileEntryWithDV("f7", ADD, null));
+        }
+    }
+
+    @Test
+    public void testConflictDeletionWithDV() {
+        {
+            // Scene 1: base -------------> update2 (conflict)
+            //           f1          ^         <f1, +dv2>
+            //                       |
+            //                  update1 (finished)
+            //                    <f1, +dv1>
+            List<SimpleFileEntry> update1Entries = new ArrayList<>();
+            update1Entries.add(createFileEntryWithDV("f1", ADD, "dv1"));
+
+            List<SimpleFileEntry> update2DeltaEntries = new ArrayList<>();
+
+            List<IndexManifestEntry> update2DeltaIndexEntries = new 
ArrayList<>();
+            update2DeltaIndexEntries.add(createDvIndexEntry("dv2", ADD, 
Arrays.asList("f1")));
+
+            List<SimpleFileEntry> update2DeltaEntriesWithDV =
+                    buildDeltaEntriesWithDV(
+                            update1Entries, update2DeltaEntries, 
update2DeltaIndexEntries);
+            assertThat(update2DeltaEntriesWithDV)
+                    .containsExactlyInAnyOrder(
+                            createFileEntryWithDV("f1", DELETE, null),
+                            createFileEntryWithDV("f1", ADD, "dv2"));
+            assertConflict(update1Entries, update2DeltaEntriesWithDV);
+        }
+
+        {
+            // Scene 2: base -------------> update2 (conflict)
+            //         <f1, dv0>     ^        <f1, +dv2>
+            //                       |
+            //                  update1 (finished)
+            //                    <f1, +dv1>
+            List<SimpleFileEntry> update1Entries = new ArrayList<>();
+            update1Entries.add(createFileEntryWithDV("f1", ADD, "dv1"));
+
+            List<SimpleFileEntry> update2DeltaEntries = new ArrayList<>();
+
+            List<IndexManifestEntry> update2DeltaIndexEntries = new 
ArrayList<>();
+            update2DeltaIndexEntries.add(createDvIndexEntry("dv0", DELETE, 
Arrays.asList("f1")));
+            update2DeltaIndexEntries.add(createDvIndexEntry("dv2", ADD, 
Arrays.asList("f1")));
+
+            List<SimpleFileEntry> update2DeltaEntriesWithDV =
+                    buildDeltaEntriesWithDV(
+                            update1Entries, update2DeltaEntries, 
update2DeltaIndexEntries);
+            assertThat(update2DeltaEntriesWithDV)
+                    .containsExactlyInAnyOrder(
+                            createFileEntryWithDV("f1", DELETE, "dv0"),
+                            createFileEntryWithDV("f1", ADD, "dv2"));
+            assertConflict(update1Entries, update2DeltaEntriesWithDV);
+        }
+
+        {
+            // Scene 3: base -------------> update2 (conflict)
+            //         <f1, dv0>      ^     <-f1, -dv0>, <+f3, null>
+            //                        |
+            //                  update1 (finished)
+            //                 <-f1, -dv0>, <+f2, dv1>
+            List<SimpleFileEntry> update1Entries = new ArrayList<>();
+            update1Entries.add(createFileEntryWithDV("f2", ADD, "dv1"));
+
+            List<SimpleFileEntry> update2DeltaEntries = new ArrayList<>();
+            update2DeltaEntries.add(createFileEntry("f1", DELETE));
+            update2DeltaEntries.add(createFileEntry("f3", ADD));
+
+            List<IndexManifestEntry> update2DeltaIndexEntries = new 
ArrayList<>();
+            update2DeltaIndexEntries.add(createDvIndexEntry("dv0", DELETE, 
Arrays.asList("f1")));
+
+            List<SimpleFileEntry> update2DeltaEntriesWithDV =
+                    buildDeltaEntriesWithDV(
+                            update1Entries, update2DeltaEntries, 
update2DeltaIndexEntries);
+            assertThat(update2DeltaEntriesWithDV)
+                    .containsExactlyInAnyOrder(
+                            createFileEntryWithDV("f1", DELETE, "dv0"),
+                            createFileEntryWithDV("f3", ADD, null));
+            assertConflict(update1Entries, update2DeltaEntriesWithDV);
+        }
+    }
+
+    private SimpleFileEntry createFileEntry(String fileName, FileKind kind) {
+        return new SimpleFileEntry(
+                kind,
+                EMPTY_ROW,
+                0,
+                1,
+                0,
+                fileName,
+                Collections.emptyList(),
+                null,
+                EMPTY_ROW,
+                EMPTY_ROW,
+                null);
+    }
+
+    private SimpleFileEntryWithDV createFileEntryWithDV(
+            String fileName, FileKind kind, @Nullable String dvFileName) {
+        return new SimpleFileEntryWithDV(createFileEntry(fileName, kind), 
dvFileName);
+    }
+
+    private IndexManifestEntry createDvIndexEntry(
+            String fileName, FileKind kind, List<String> fileNames) {
+        LinkedHashMap<String, DeletionVectorMeta> dvRanges = new 
LinkedHashMap<>();
+        for (String name : fileNames) {
+            dvRanges.put(name, new DeletionVectorMeta(name, 1, 1, 1L));
+        }
+        return new IndexManifestEntry(
+                kind,
+                EMPTY_ROW,
+                0,
+                new IndexFileMeta(
+                        DELETION_VECTORS_INDEX, fileName, 11, dvRanges.size(), 
dvRanges, null));
+    }
+
+    private void assertConflict(
+            List<SimpleFileEntry> baseEntries, List<SimpleFileEntry> 
deltaEntries) {
+        ArrayList<SimpleFileEntry> simpleFileEntryWithDVS = new 
ArrayList<>(baseEntries);
+        simpleFileEntryWithDVS.addAll(deltaEntries);
+        Collection<SimpleFileEntry> merged = 
FileEntry.mergeEntries(simpleFileEntryWithDVS);
+        int deleteCount = 0;
+        for (SimpleFileEntry simpleFileEntryWithDV : merged) {
+            if (simpleFileEntryWithDV.kind().equals(FileKind.DELETE)) {
+                deleteCount++;
+            }
+        }
+        assert (deleteCount > 0);
+    }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
index c85315d876..461aca90ef 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
@@ -119,6 +119,7 @@ case class DeleteFromPaimonTableCommand(
   }
 
   private def performNonPrimaryKeyDelete(sparkSession: SparkSession): 
Seq[CommitMessage] = {
+    val readSnapshot = table.snapshotManager().latestSnapshot()
     // Step1: the candidate data splits which are filtered by Paimon Predicate.
     val candidateDataSplits = findCandidateDataSplits(condition, 
relation.output)
     val dataFilePathToMeta = candidateFileMap(candidateDataSplits)
@@ -133,7 +134,7 @@ case class DeleteFromPaimonTableCommand(
         sparkSession)
 
       // Step3: update the touched deletion vectors and index files
-      writer.persistDeletionVectors(deletionVectors)
+      writer.persistDeletionVectors(deletionVectors, readSnapshot)
     } else {
       // Step2: extract out the exactly files, which must have at least one 
record to be updated.
       val touchedFilePaths =
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
index 5f12d1110d..1ae894dbae 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
@@ -88,6 +88,8 @@ case class MergeIntoPaimonTable(
   }
 
   private def performMergeForNonPkTable(sparkSession: SparkSession): 
Seq[CommitMessage] = {
+    // todo: find a more universal way to make read snapshot consistent.
+    val readSnapshot = table.snapshotManager().latestSnapshot()
     val targetDS = createDataset(sparkSession, filteredTargetPlan)
     val sourceDS = createDataset(sparkSession, sourceTable)
 
@@ -113,7 +115,7 @@ case class MergeIntoPaimonTable(
         val dvDS = ds.where(
           s"$ROW_KIND_COL = ${RowKind.DELETE.toByteValue} or $ROW_KIND_COL = 
${RowKind.UPDATE_AFTER.toByteValue}")
         val deletionVectors = collectDeletionVectors(dataFilePathToMeta, dvDS, 
sparkSession)
-        val indexCommitMsg = writer.persistDeletionVectors(deletionVectors)
+        val indexCommitMsg = writer.persistDeletionVectors(deletionVectors, 
readSnapshot)
 
         // Step4: filter rows that should be written as the inserted/updated 
data.
         val toWriteDS = ds
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
index 246245c052..6d0563b364 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
@@ -18,7 +18,7 @@
 
 package org.apache.paimon.spark.commands
 
-import org.apache.paimon.CoreOptions
+import org.apache.paimon.{CoreOptions, Snapshot}
 import org.apache.paimon.CoreOptions.{PartitionSinkStrategy, WRITE_ONLY}
 import org.apache.paimon.codegen.CodeGenUtils
 import org.apache.paimon.crosspartition.{IndexBootstrap, KeyPartOrRow}
@@ -304,10 +304,11 @@ case class PaimonSparkWriter(table: FileStoreTable, 
writeRowTracking: Boolean =
    * deletion vectors; else, one index file will contain all deletion vector 
with the same partition
    * and bucket.
    */
-  def persistDeletionVectors(deletionVectors: Dataset[SparkDeletionVector]): 
Seq[CommitMessage] = {
+  def persistDeletionVectors(
+      deletionVectors: Dataset[SparkDeletionVector],
+      snapshot: Snapshot): Seq[CommitMessage] = {
     val sparkSession = deletionVectors.sparkSession
     import sparkSession.implicits._
-    val snapshot = table.snapshotManager().latestSnapshotFromFileSystem()
     val serializedCommits = deletionVectors
       .groupByKey(_.partitionAndBucket)
       .mapGroups {
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
index 1babd7b5c3..8839d5c8ac 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
@@ -71,6 +71,7 @@ case class UpdatePaimonTableCommand(
 
   /** Update for table without primary keys */
   private def performUpdateForNonPkTable(sparkSession: SparkSession): 
Seq[CommitMessage] = {
+    val readSnapshot = table.snapshotManager().latestSnapshot()
     // Step1: the candidate data splits which are filtered by Paimon Predicate.
     val candidateDataSplits = findCandidateDataSplits(condition, 
relation.output)
     val dataFilePathToMeta = candidateFileMap(candidateDataSplits)
@@ -100,7 +101,7 @@ case class UpdatePaimonTableCommand(
           val addCommitMessage = writeOnlyUpdatedData(sparkSession, 
touchedDataSplits)
 
           // Step4: write these deletion vectors.
-          val indexCommitMsg = writer.persistDeletionVectors(deletionVectors)
+          val indexCommitMsg = writer.persistDeletionVectors(deletionVectors, 
readSnapshot)
 
           addCommitMessage ++ indexCommitMsg
         } finally {
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala
index c867aed9f0..bfbadc7624 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala
@@ -760,46 +760,49 @@ trait MergeIntoAppendTableTest extends 
PaimonSparkTestBase with PaimonAppendTabl
   }
 
   test("Paimon MergeInto: concurrent merge and compact") {
-    withTable("s", "t") {
-      sql("CREATE TABLE s (id INT, b INT, c INT)")
-      sql("INSERT INTO s VALUES (1, 1, 1)")
-
-      sql("CREATE TABLE t (id INT, b INT, c INT)")
-      sql("INSERT INTO t VALUES (1, 1, 1)")
-
-      val mergeInto = Future {
-        for (_ <- 1 to 10) {
-          try {
-            sql("""
-                  |MERGE INTO t
-                  |USING s
-                  |ON t.id = s.id
-                  |WHEN MATCHED THEN
-                  |UPDATE SET t.id = s.id, t.b = s.b + t.b, t.c = s.c + t.c
-                  |""".stripMargin)
-          } catch {
-            case a: Throwable =>
-              assert(
-                a.getMessage.contains("Conflicts during commits") || 
a.getMessage.contains(
-                  "Missing file"))
+    for (dvEnabled <- Seq("true", "false")) {
+      withTable("s", "t") {
+        sql("CREATE TABLE s (id INT, b INT, c INT)")
+        sql("INSERT INTO s VALUES (1, 1, 1)")
+
+        sql(
+          s"CREATE TABLE t (id INT, b INT, c INT) TBLPROPERTIES 
('deletion-vectors.enabled' = '$dvEnabled')")
+        sql("INSERT INTO t VALUES (1, 1, 1)")
+
+        val mergeInto = Future {
+          for (_ <- 1 to 10) {
+            try {
+              sql("""
+                    |MERGE INTO t
+                    |USING s
+                    |ON t.id = s.id
+                    |WHEN MATCHED THEN
+                    |UPDATE SET t.id = s.id, t.b = s.b + t.b, t.c = s.c + t.c
+                    |""".stripMargin)
+            } catch {
+              case a: Throwable =>
+                assert(
+                  a.getMessage.contains("Conflicts during commits") || 
a.getMessage.contains(
+                    "Missing file"))
+            }
+            checkAnswer(sql("SELECT count(*) FROM t"), Seq(Row(1)))
           }
-          checkAnswer(sql("SELECT count(*) FROM t"), Seq(Row(1)))
         }
-      }
 
-      val compact = Future {
-        for (_ <- 1 to 10) {
-          try {
-            sql("CALL sys.compact(table => 't', order_strategy => 'order', 
order_by => 'id')")
-          } catch {
-            case a: Throwable => assert(a.getMessage.contains("Conflicts 
during commits"))
+        val compact = Future {
+          for (_ <- 1 to 10) {
+            try {
+              sql("CALL sys.compact(table => 't', order_strategy => 'order', 
order_by => 'id')")
+            } catch {
+              case a: Throwable => assert(a.getMessage.contains("Conflicts 
during commits"))
+            }
+            checkAnswer(sql("SELECT count(*) FROM t"), Seq(Row(1)))
           }
-          checkAnswer(sql("SELECT count(*) FROM t"), Seq(Row(1)))
         }
-      }
 
-      Await.result(mergeInto, 60.seconds)
-      Await.result(compact, 60.seconds)
+        Await.result(mergeInto, 60.seconds)
+        Await.result(compact, 60.seconds)
+      }
     }
   }
 }
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTestBase.scala
index 097d2c4e14..072324ce3a 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTestBase.scala
@@ -25,6 +25,11 @@ import org.apache.paimon.spark.catalyst.analysis.Update
 import org.apache.spark.sql.Row
 import org.assertj.core.api.Assertions.{assertThat, assertThatThrownBy}
 
+import scala.concurrent.{Await, Future}
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.duration.DurationInt
+import scala.util.Random
+
 abstract class UpdateTableTestBase extends PaimonSparkTestBase {
 
   import testImplicits._
@@ -393,4 +398,38 @@ abstract class UpdateTableTestBase extends 
PaimonSparkTestBase {
       assert(latestSnapshot.commitKind.equals(Snapshot.CommitKind.COMPACT))
     }
   }
+
+  test("Paimon update: random concurrent update and dv table") {
+    withTable("t") {
+      val recordCount = 10000
+      val maxCurrent = Random.nextInt(2) + 1
+
+      sql(s"CREATE TABLE t (a INT, b INT) TBLPROPERTIES 
('deletion-vectors.enabled' = 'true')")
+      sql(s"INSERT INTO t SELECT id AS a, 0 AS b FROM range(0, $recordCount)")
+
+      def run(): Future[Unit] = Future {
+        for (_ <- 1 to 20) {
+          try {
+            val i = 20 + Random.nextInt(100)
+            Random.nextInt(2) match {
+              case 0 => sql(s"UPDATE t SET b = b + 1 WHERE (a % $i) = 
${Random.nextInt(i)}")
+              case 1 =>
+                sql("CALL sys.compact(table => 't', options => 
'compaction.min.file-num=1')")
+              case 2 =>
+                sql("CALL sys.compact(table => 't', order_strategy => 'order', 
order_by => 'a')")
+            }
+          } catch {
+            case a: Throwable => assert(a.getMessage.contains("Conflicts 
during commits"))
+          }
+          checkAnswer(sql("SELECT count(*) FROM t"), Seq(Row(recordCount)))
+        }
+      }
+
+      (1 to maxCurrent)
+        .map(_ => run())
+        .foreach(
+          Await.result(_, 600.seconds)
+        )
+    }
+  }
 }

Reply via email to