This is an automated email from the ASF dual-hosted git repository.

biyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new c4762ccdd [core] Refactor AppendDeletionFileMaintainer to separate 
bucketed and unaware (#3950)
c4762ccdd is described below

commit c4762ccddd198da0b355a69cd7c083ab18b116fa
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Aug 13 18:08:09 2024 +0800

    [core] Refactor AppendDeletionFileMaintainer to separate bucketed and 
unaware (#3950)
---
 .../deletionvectors/DeletionVectorsMaintainer.java | 16 +++++
 .../append/AppendDeletionFileMaintainer.java       | 72 ++++++++++++++++++++++
 .../BucketedAppendDeletionFileMaintainer.java      | 65 +++++++++++++++++++
 .../UnawareAppendDeletionFileMaintainer.java}      | 71 +++++++++------------
 .../org/apache/paimon/index/IndexFileHandler.java  |  6 --
 .../java/org/apache/paimon/table/BucketMode.java   |  4 +-
 .../table/sink/UnawareBucketRowKeyExtractor.java   |  3 +-
 .../org/apache/paimon/TestAppendFileStore.java     | 12 ++--
 .../AppendDeletionFileMaintainerTest.java}         | 17 ++---
 .../paimon/spark/commands/PaimonSparkWriter.scala  | 19 +++---
 10 files changed, 216 insertions(+), 69 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java
 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java
index 8079d977c..8dd6af1de 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java
@@ -75,6 +75,22 @@ public class DeletionVectorsMaintainer {
         modified = true;
     }
 
+    /**
+     * Merge a new deletion which marks the specified deletion vector with the 
given file name, if
+     * the previous deletion vector exist, merge the old one.
+     *
+     * @param fileName The name of the file where the deletion occurred.
+     * @param deletionVector The deletion vector
+     */
+    public void mergeNewDeletion(String fileName, DeletionVector 
deletionVector) {
+        DeletionVector old = deletionVectors.get(fileName);
+        if (old != null) {
+            deletionVector.merge(old);
+        }
+        deletionVectors.put(fileName, deletionVector);
+        modified = true;
+    }
+
     /**
      * Removes the specified file's deletion vector, this method is typically 
used for remove before
      * files' deletion vector in compaction.
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainer.java
 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainer.java
new file mode 100644
index 000000000..4922e45a6
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainer.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.deletionvectors.append;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.deletionvectors.DeletionVector;
+import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
+import org.apache.paimon.index.IndexFileHandler;
+import org.apache.paimon.manifest.IndexManifestEntry;
+import org.apache.paimon.table.source.DeletionFile;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.paimon.table.BucketMode.UNAWARE_BUCKET;
+
+/**
+ * A maintainer to maintain deletion files for append table, the core methods:
+ *
+ * <ul>
+ *   <li>{@link #notifyDeletionFiles}: Mark the deletion of data files, create 
new deletion vectors.
+ *   <li>{@link #persist}: persist deletion files to commit.
+ * </ul>
+ */
+public interface AppendDeletionFileMaintainer {
+
+    BinaryRow getPartition();
+
+    int getBucket();
+
+    void notifyDeletionFiles(String dataFile, DeletionVector deletionVector);
+
+    List<IndexManifestEntry> persist();
+
+    static AppendDeletionFileMaintainer forBucketedAppend(
+            IndexFileHandler indexFileHandler,
+            @Nullable Long snapshotId,
+            BinaryRow partition,
+            int bucket) {
+        // bucket should have only one deletion file, so here we should read 
old deletion vectors,
+        // overwrite the entire deletion file of the bucket when writing 
deletes.
+        DeletionVectorsMaintainer maintainer =
+                new DeletionVectorsMaintainer.Factory(indexFileHandler)
+                        .createOrRestore(snapshotId, partition, bucket);
+        return new BucketedAppendDeletionFileMaintainer(partition, bucket, 
maintainer);
+    }
+
+    static AppendDeletionFileMaintainer forUnawareAppend(
+            IndexFileHandler indexFileHandler, @Nullable Long snapshotId, 
BinaryRow partition) {
+        Map<String, DeletionFile> deletionFiles =
+                indexFileHandler.scanDVIndex(snapshotId, partition, 
UNAWARE_BUCKET);
+        return new UnawareAppendDeletionFileMaintainer(indexFileHandler, 
partition, deletionFiles);
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/BucketedAppendDeletionFileMaintainer.java
 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/BucketedAppendDeletionFileMaintainer.java
new file mode 100644
index 000000000..1b839575f
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/BucketedAppendDeletionFileMaintainer.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.deletionvectors.append;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.deletionvectors.DeletionVector;
+import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
+import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.manifest.IndexManifestEntry;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** A {@link AppendDeletionFileMaintainer} of bucketed append table. */
+public class BucketedAppendDeletionFileMaintainer implements 
AppendDeletionFileMaintainer {
+
+    private final BinaryRow partition;
+    private final int bucket;
+    private final DeletionVectorsMaintainer maintainer;
+
+    BucketedAppendDeletionFileMaintainer(
+            BinaryRow partition, int bucket, DeletionVectorsMaintainer 
maintainer) {
+        this.partition = partition;
+        this.bucket = bucket;
+        this.maintainer = maintainer;
+    }
+
+    @Override
+    public BinaryRow getPartition() {
+        return this.partition;
+    }
+
+    @Override
+    public int getBucket() {
+        return this.bucket;
+    }
+
+    @Override
+    public void notifyDeletionFiles(String dataFile, DeletionVector 
deletionVector) {
+        maintainer.mergeNewDeletion(dataFile, deletionVector);
+    }
+
+    @Override
+    public List<IndexManifestEntry> persist() {
+        return maintainer.writeDeletionVectorsIndex().stream()
+                .map(fileMeta -> new IndexManifestEntry(FileKind.ADD, 
partition, bucket, fileMeta))
+                .collect(Collectors.toList());
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileMaintainer.java
 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/UnawareAppendDeletionFileMaintainer.java
similarity index 74%
rename from 
paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileMaintainer.java
rename to 
paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/UnawareAppendDeletionFileMaintainer.java
index 39a0c7592..de6baac9f 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileMaintainer.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/UnawareAppendDeletionFileMaintainer.java
@@ -16,10 +16,13 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.deletionvectors;
+package org.apache.paimon.deletionvectors.append;
 
 import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.deletionvectors.DeletionVector;
+import org.apache.paimon.deletionvectors.DeletionVectorsIndexFile;
+import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.index.IndexFileHandler;
 import org.apache.paimon.index.IndexFileMeta;
@@ -35,13 +38,14 @@ import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
-/** DeletionVectorIndexFileMaintainer. */
-public class DeletionVectorIndexFileMaintainer {
+import static org.apache.paimon.table.BucketMode.UNAWARE_BUCKET;
+
+/** A {@link AppendDeletionFileMaintainer} of unaware bucket append table. */
+public class UnawareAppendDeletionFileMaintainer implements 
AppendDeletionFileMaintainer {
 
     private final IndexFileHandler indexFileHandler;
 
     private final BinaryRow partition;
-    private final int bucket;
     private final Map<String, IndexManifestEntry> indexNameToEntry = new 
HashMap<>();
 
     private final Map<String, Map<String, DeletionFile>> 
indexFileToDeletionFiles = new HashMap<>();
@@ -51,26 +55,16 @@ public class DeletionVectorIndexFileMaintainer {
 
     private final DeletionVectorsMaintainer maintainer;
 
-    // the key of dataFileToDeletionFiles is the relative path again table's 
location.
-    public DeletionVectorIndexFileMaintainer(
+    UnawareAppendDeletionFileMaintainer(
             IndexFileHandler indexFileHandler,
-            Long snapshotId,
             BinaryRow partition,
-            int bucket,
-            boolean restore) {
+            Map<String, DeletionFile> deletionFiles) {
         this.indexFileHandler = indexFileHandler;
         this.partition = partition;
-        this.bucket = bucket;
-        if (restore) {
-            this.maintainer =
-                    new DeletionVectorsMaintainer.Factory(indexFileHandler)
-                            .createOrRestore(snapshotId, partition, bucket);
-        } else {
-            this.maintainer = new 
DeletionVectorsMaintainer.Factory(indexFileHandler).create();
-        }
-        Map<String, DeletionFile> dataFileToDeletionFiles =
-                indexFileHandler.scanDVIndex(snapshotId, partition, bucket);
-        init(dataFileToDeletionFiles);
+        // the deletion of data files is independent
+        // just create an empty maintainer
+        this.maintainer = new 
DeletionVectorsMaintainer.Factory(indexFileHandler).create();
+        init(deletionFiles);
     }
 
     @VisibleForTesting
@@ -98,14 +92,17 @@ public class DeletionVectorIndexFileMaintainer {
         }
     }
 
+    @Override
     public BinaryRow getPartition() {
         return this.partition;
     }
 
+    @Override
     public int getBucket() {
-        return this.bucket;
+        return UNAWARE_BUCKET;
     }
 
+    @Override
     public void notifyDeletionFiles(String dataFile, DeletionVector 
deletionVector) {
         DeletionVectorsIndexFile deletionVectorsIndexFile = 
indexFileHandler.deletionVectorsIndex();
         DeletionFile previous = null;
@@ -122,17 +119,7 @@ public class DeletionVectorIndexFileMaintainer {
         maintainer.notifyNewDeletion(dataFile, deletionVector);
     }
 
-    public void notifyDeletionFiles(Map<String, DeletionFile> 
dataFileToDeletionFiles) {
-        for (String dataFile : dataFileToDeletionFiles.keySet()) {
-            DeletionFile deletionFile = dataFileToDeletionFiles.get(dataFile);
-            String indexFileName = new Path(deletionFile.path()).getName();
-            touchedIndexFiles.add(indexFileName);
-            if (indexFileToDeletionFiles.containsKey(indexFileName)) {
-                indexFileToDeletionFiles.get(indexFileName).remove(dataFile);
-            }
-        }
-    }
-
+    @Override
     public List<IndexManifestEntry> persist() {
         List<IndexManifestEntry> result = writeUnchangedDeletionVector();
         List<IndexManifestEntry> newIndexFileEntries =
@@ -140,13 +127,14 @@ public class DeletionVectorIndexFileMaintainer {
                         .map(
                                 fileMeta ->
                                         new IndexManifestEntry(
-                                                FileKind.ADD, partition, 
bucket, fileMeta))
+                                                FileKind.ADD, partition, 
UNAWARE_BUCKET, fileMeta))
                         .collect(Collectors.toList());
         result.addAll(newIndexFileEntries);
         return result;
     }
 
-    public List<IndexManifestEntry> writeUnchangedDeletionVector() {
+    @VisibleForTesting
+    List<IndexManifestEntry> writeUnchangedDeletionVector() {
         DeletionVectorsIndexFile deletionVectorsIndexFile = 
indexFileHandler.deletionVectorsIndex();
         List<IndexManifestEntry> newIndexEntries = new ArrayList<>();
         for (String indexFile : indexFileToDeletionFiles.keySet()) {
@@ -162,14 +150,13 @@ public class DeletionVectorIndexFileMaintainer {
                                     
deletionVectorsIndexFile.readDeletionVector(
                                             dataFileToDeletionFiles));
                     newIndexFiles.forEach(
-                            newIndexFile -> {
-                                newIndexEntries.add(
-                                        new IndexManifestEntry(
-                                                FileKind.ADD,
-                                                oldEntry.partition(),
-                                                oldEntry.bucket(),
-                                                newIndexFile));
-                            });
+                            newIndexFile ->
+                                    newIndexEntries.add(
+                                            new IndexManifestEntry(
+                                                    FileKind.ADD,
+                                                    oldEntry.partition(),
+                                                    oldEntry.bucket(),
+                                                    newIndexFile)));
                 }
 
                 // mark the touched index file as removed.
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java 
b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
index ee6cbe769..bdf47b16a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
+++ b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
@@ -21,7 +21,6 @@ package org.apache.paimon.index;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.deletionvectors.DeletionVector;
-import org.apache.paimon.deletionvectors.DeletionVectorIndexFileMaintainer;
 import org.apache.paimon.deletionvectors.DeletionVectorsIndexFile;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.manifest.IndexManifestEntry;
@@ -271,11 +270,6 @@ public class IndexFileHandler {
         indexManifestFile.delete(indexManifest);
     }
 
-    public DeletionVectorIndexFileMaintainer createDVIndexFileMaintainer(
-            Long snapshotId, BinaryRow partition, int bucket, boolean restore) 
{
-        return new DeletionVectorIndexFileMaintainer(this, snapshotId, 
partition, bucket, restore);
-    }
-
     public Map<String, DeletionVector> 
readAllDeletionVectors(List<IndexFileMeta> fileMetas) {
         for (IndexFileMeta indexFile : fileMetas) {
             checkArgument(
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/BucketMode.java 
b/paimon-core/src/main/java/org/apache/paimon/table/BucketMode.java
index 2fcc8822a..c3b7ca1ab 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/BucketMode.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/BucketMode.java
@@ -57,5 +57,7 @@ public enum BucketMode {
      * Ignoring bucket concept, although all data is written to bucket-0, the 
parallelism of reads
      * and writes is unrestricted. This mode only works for append-only table.
      */
-    BUCKET_UNAWARE
+    BUCKET_UNAWARE;
+
+    public static final int UNAWARE_BUCKET = 0;
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/UnawareBucketRowKeyExtractor.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/UnawareBucketRowKeyExtractor.java
index e99133899..ad8a3cd83 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/UnawareBucketRowKeyExtractor.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/UnawareBucketRowKeyExtractor.java
@@ -20,6 +20,7 @@ package org.apache.paimon.table.sink;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.BucketMode;
 
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
@@ -37,6 +38,6 @@ public class UnawareBucketRowKeyExtractor extends 
RowKeyExtractor {
 
     @Override
     public int bucket() {
-        return 0;
+        return BucketMode.UNAWARE_BUCKET;
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java 
b/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java
index c86b1cb40..a68779226 100644
--- a/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java
+++ b/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java
@@ -19,8 +19,9 @@
 package org.apache.paimon;
 
 import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.deletionvectors.DeletionVectorIndexFileMaintainer;
 import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
+import org.apache.paimon.deletionvectors.append.AppendDeletionFileMaintainer;
+import 
org.apache.paimon.deletionvectors.append.UnawareAppendDeletionFileMaintainer;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.FileIOFinder;
 import org.apache.paimon.fs.Path;
@@ -118,10 +119,11 @@ public class TestAppendFileStore extends 
AppendOnlyFileStore {
         return fileHandler.scan(lastSnapshotId, DELETION_VECTORS_INDEX, 
partition, bucket);
     }
 
-    public DeletionVectorIndexFileMaintainer createDVIFMaintainer(
-            BinaryRow partition, int bucket, Map<String, DeletionFile> 
dataFileToDeletionFiles) {
-        DeletionVectorIndexFileMaintainer maintainer =
-                new DeletionVectorIndexFileMaintainer(fileHandler, null, 
partition, bucket, false);
+    public UnawareAppendDeletionFileMaintainer createDVIFMaintainer(
+            BinaryRow partition, Map<String, DeletionFile> 
dataFileToDeletionFiles) {
+        UnawareAppendDeletionFileMaintainer maintainer =
+                (UnawareAppendDeletionFileMaintainer)
+                        
AppendDeletionFileMaintainer.forUnawareAppend(fileHandler, null, partition);
         maintainer.init(dataFileToDeletionFiles);
         return maintainer;
     }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileMaintainerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerTest.java
similarity index 91%
rename from 
paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileMaintainerTest.java
rename to 
paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerTest.java
index f78e39dfb..2ebc30cf9 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileMaintainerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerTest.java
@@ -16,10 +16,13 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.deletionvectors;
+package org.apache.paimon.deletionvectors.append;
 
 import org.apache.paimon.TestAppendFileStore;
 import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.deletionvectors.DeletionVector;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.index.IndexFileMeta;
 import org.apache.paimon.manifest.FileKind;
 import org.apache.paimon.manifest.IndexManifestEntry;
@@ -39,8 +42,7 @@ import java.util.Map;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
-/** Test for DeletionVectorIndexFileMaintainer. */
-public class DeletionVectorIndexFileMaintainerTest {
+class AppendDeletionFileMaintainerTest {
 
     @TempDir java.nio.file.Path tempDir;
 
@@ -68,8 +70,8 @@ public class DeletionVectorIndexFileMaintainerTest {
                 createDeletionFileMapFromIndexFileMetas(
                         indexPathFactory, 
commitMessage2.indexIncrement().newIndexFiles()));
 
-        DeletionVectorIndexFileMaintainer dvIFMaintainer =
-                store.createDVIFMaintainer(BinaryRow.EMPTY_ROW, 1, 
dataFileToDeletionFiles);
+        UnawareAppendDeletionFileMaintainer dvIFMaintainer =
+                store.createDVIFMaintainer(BinaryRow.EMPTY_ROW, 
dataFileToDeletionFiles);
 
         // no dv should be rewritten, because nothing is changed.
         List<IndexManifestEntry> res = 
dvIFMaintainer.writeUnchangedDeletionVector();
@@ -77,8 +79,9 @@ public class DeletionVectorIndexFileMaintainerTest {
 
         // the dv of f3 is updated, and the index file that contains the dv of 
f3 should be marked
         // as REMOVE.
+        FileIO fileIO = LocalFileIO.create();
         dvIFMaintainer.notifyDeletionFiles(
-                Collections.singletonMap("f3", 
dataFileToDeletionFiles.get("f3")));
+                "f3", DeletionVector.read(fileIO, 
dataFileToDeletionFiles.get("f3")));
         res = dvIFMaintainer.writeUnchangedDeletionVector();
         assertThat(res.size()).isEqualTo(1);
         assertThat(res.get(0).kind()).isEqualTo(FileKind.DELETE);
@@ -86,7 +89,7 @@ public class DeletionVectorIndexFileMaintainerTest {
         // the dv of f1 and f2 are in one index file, and the dv of f1 is 
updated.
         // the dv of f2 need to be rewritten, and this index file should be 
marked as REMOVE.
         dvIFMaintainer.notifyDeletionFiles(
-                Collections.singletonMap("f1", 
dataFileToDeletionFiles.get("f1")));
+                "f1", DeletionVector.read(fileIO, 
dataFileToDeletionFiles.get("f1")));
         res = dvIFMaintainer.writeUnchangedDeletionVector();
         assertThat(res.size()).isEqualTo(3);
         IndexManifestEntry entry =
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
index d12909b8a..7bdd0ce60 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
@@ -19,7 +19,8 @@
 package org.apache.paimon.spark.commands
 
 import org.apache.paimon.CoreOptions.WRITE_ONLY
-import org.apache.paimon.deletionvectors.{DeletionVector, 
DeletionVectorIndexFileMaintainer}
+import org.apache.paimon.deletionvectors.DeletionVector
+import org.apache.paimon.deletionvectors.append.{AppendDeletionFileMaintainer, 
UnawareAppendDeletionFileMaintainer}
 import org.apache.paimon.index.{BucketAssigner, SimpleHashBucketAssigner}
 import org.apache.paimon.io.{CompactIncrement, DataIncrement, IndexIncrement}
 import org.apache.paimon.manifest.{FileKind, IndexManifestEntry}
@@ -27,6 +28,7 @@ import org.apache.paimon.spark.{SparkRow, SparkTableWrite}
 import org.apache.paimon.spark.schema.SparkSystemColumns.{BUCKET_COL, 
ROW_KIND_COL}
 import org.apache.paimon.spark.util.SparkRowUtils
 import org.apache.paimon.table.{BucketMode, FileStoreTable}
+import org.apache.paimon.table.BucketMode.BUCKET_UNAWARE
 import org.apache.paimon.table.sink.{BatchWriteBuilder, CommitMessage, 
CommitMessageImpl, CommitMessageSerializer, RowPartitionKeyExtractor}
 import org.apache.paimon.utils.SerializationUtils
 
@@ -179,7 +181,7 @@ case class PaimonSparkWriter(table: FileStoreTable) {
               numAssigners,
               encoderGroupWithBucketCol))
         }
-      case BucketMode.BUCKET_UNAWARE =>
+      case BUCKET_UNAWARE =>
         // Topology: input ->
         writeWithoutBucket()
       case BucketMode.HASH_FIXED =>
@@ -211,17 +213,20 @@ case class PaimonSparkWriter(table: FileStoreTable) {
       .mapGroups {
         case (_, iter: Iterator[SparkDeletionVectors]) =>
           val indexHandler = table.store().newIndexFileHandler()
-          var dvIndexFileMaintainer: DeletionVectorIndexFileMaintainer = null
+          var dvIndexFileMaintainer: AppendDeletionFileMaintainer = null
           while (iter.hasNext) {
             val sdv: SparkDeletionVectors = iter.next()
             if (dvIndexFileMaintainer == null) {
               val partition = 
SerializationUtils.deserializeBinaryRow(sdv.partition)
-              dvIndexFileMaintainer = indexHandler
-                .createDVIndexFileMaintainer(
+              dvIndexFileMaintainer = if (bucketMode == BUCKET_UNAWARE) {
+                AppendDeletionFileMaintainer.forUnawareAppend(indexHandler, 
snapshotId, partition)
+              } else {
+                AppendDeletionFileMaintainer.forBucketedAppend(
+                  indexHandler,
                   snapshotId,
                   partition,
-                  sdv.bucket,
-                  bucketMode != BucketMode.BUCKET_UNAWARE)
+                  sdv.bucket)
+              }
             }
             if (dvIndexFileMaintainer == null) {
               throw new RuntimeException("can't create the dv maintainer.")

Reply via email to