This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new be2996efe  [spark] refactor delete with deletion vector to parallel 
write deletion file (#3744)
be2996efe is described below

commit be2996efe24b3757099261d2287a923771f08555
Author: Yann Byron <[email protected]>
AuthorDate: Mon Jul 29 10:57:17 2024 +0800

     [spark] refactor delete with deletion vector to parallel write deletion 
file (#3744)
---
 .../DeletionVectorIndexFileMaintainer.java         | 68 +++++++++++++++++-
 .../deletionvectors/DeletionVectorsIndexFile.java  | 13 ++++
 .../org/apache/paimon/index/IndexFileHandler.java  | 78 +++++++++++++++++++-
 .../org/apache/paimon/TestAppendFileStore.java     |  7 +-
 .../DeletionVectorIndexFileMaintainerTest.java     |  2 +-
 .../commands/DeleteFromPaimonTableCommand.scala    | 29 ++++----
 .../paimon/spark/commands/PaimonCommand.scala      | 77 ++------------------
 .../paimon/spark/commands/PaimonSparkWriter.scala  | 83 ++++++++++------------
 .../paimon/spark/commands/SparkDataFileMeta.scala  |  6 --
 .../spark/commands/UpdatePaimonTableCommand.scala  |  2 +-
 .../org/apache/paimon/spark/util/SQLHelper.scala   | 55 ++++++++++++++
 11 files changed, 277 insertions(+), 143 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileMaintainer.java
 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileMaintainer.java
index 50e921f0d..39a0c7592 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileMaintainer.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileMaintainer.java
@@ -18,6 +18,8 @@
 
 package org.apache.paimon.deletionvectors;
 
+import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.index.IndexFileHandler;
 import org.apache.paimon.index.IndexFileMeta;
@@ -38,15 +40,41 @@ public class DeletionVectorIndexFileMaintainer {
 
     private final IndexFileHandler indexFileHandler;
 
+    private final BinaryRow partition;
+    private final int bucket;
     private final Map<String, IndexManifestEntry> indexNameToEntry = new 
HashMap<>();
 
     private final Map<String, Map<String, DeletionFile>> 
indexFileToDeletionFiles = new HashMap<>();
+    private final Map<String, String> dataFileToIndexFile = new HashMap<>();
 
     private final Set<String> touchedIndexFiles = new HashSet<>();
 
+    private final DeletionVectorsMaintainer maintainer;
+
+    // the key of dataFileToDeletionFiles is the relative path again table's 
location.
     public DeletionVectorIndexFileMaintainer(
-            IndexFileHandler indexFileHandler, Map<String, DeletionFile> 
dataFileToDeletionFiles) {
+            IndexFileHandler indexFileHandler,
+            Long snapshotId,
+            BinaryRow partition,
+            int bucket,
+            boolean restore) {
         this.indexFileHandler = indexFileHandler;
+        this.partition = partition;
+        this.bucket = bucket;
+        if (restore) {
+            this.maintainer =
+                    new DeletionVectorsMaintainer.Factory(indexFileHandler)
+                            .createOrRestore(snapshotId, partition, bucket);
+        } else {
+            this.maintainer = new 
DeletionVectorsMaintainer.Factory(indexFileHandler).create();
+        }
+        Map<String, DeletionFile> dataFileToDeletionFiles =
+                indexFileHandler.scanDVIndex(snapshotId, partition, bucket);
+        init(dataFileToDeletionFiles);
+    }
+
+    @VisibleForTesting
+    public void init(Map<String, DeletionFile> dataFileToDeletionFiles) {
         List<String> touchedIndexFileNames =
                 dataFileToDeletionFiles.values().stream()
                         .map(deletionFile -> new 
Path(deletionFile.path()).getName())
@@ -66,7 +94,32 @@ public class DeletionVectorIndexFileMaintainer {
                 indexFileToDeletionFiles.put(indexFileName, new HashMap<>());
             }
             indexFileToDeletionFiles.get(indexFileName).put(dataFile, 
deletionFile);
+            dataFileToIndexFile.put(dataFile, indexFileName);
+        }
+    }
+
+    public BinaryRow getPartition() {
+        return this.partition;
+    }
+
+    public int getBucket() {
+        return this.bucket;
+    }
+
+    public void notifyDeletionFiles(String dataFile, DeletionVector 
deletionVector) {
+        DeletionVectorsIndexFile deletionVectorsIndexFile = 
indexFileHandler.deletionVectorsIndex();
+        DeletionFile previous = null;
+        if (dataFileToIndexFile.containsKey(dataFile)) {
+            String indexFileName = dataFileToIndexFile.get(dataFile);
+            touchedIndexFiles.add(indexFileName);
+            if (indexFileToDeletionFiles.containsKey(indexFileName)) {
+                previous = 
indexFileToDeletionFiles.get(indexFileName).remove(dataFile);
+            }
         }
+        if (previous != null) {
+            
deletionVector.merge(deletionVectorsIndexFile.readDeletionVector(dataFile, 
previous));
+        }
+        maintainer.notifyNewDeletion(dataFile, deletionVector);
     }
 
     public void notifyDeletionFiles(Map<String, DeletionFile> 
dataFileToDeletionFiles) {
@@ -80,6 +133,19 @@ public class DeletionVectorIndexFileMaintainer {
         }
     }
 
+    public List<IndexManifestEntry> persist() {
+        List<IndexManifestEntry> result = writeUnchangedDeletionVector();
+        List<IndexManifestEntry> newIndexFileEntries =
+                maintainer.writeDeletionVectorsIndex().stream()
+                        .map(
+                                fileMeta ->
+                                        new IndexManifestEntry(
+                                                FileKind.ADD, partition, 
bucket, fileMeta))
+                        .collect(Collectors.toList());
+        result.addAll(newIndexFileEntries);
+        return result;
+    }
+
     public List<IndexManifestEntry> writeUnchangedDeletionVector() {
         DeletionVectorsIndexFile deletionVectorsIndexFile = 
indexFileHandler.deletionVectorsIndex();
         List<IndexManifestEntry> newIndexEntries = new ArrayList<>();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java
 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java
index f21aeb5a7..ffb024734 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java
@@ -121,6 +121,19 @@ public class DeletionVectorsIndexFile extends IndexFile {
         return deletionVectors;
     }
 
+    public DeletionVector readDeletionVector(String dataFile, DeletionFile 
deletionFile) {
+        String indexFile = deletionFile.path();
+        try (SeekableInputStream inputStream = fileIO.newInputStream(new 
Path(indexFile))) {
+            checkVersion(inputStream);
+            checkArgument(deletionFile.path().equals(indexFile));
+            inputStream.seek(deletionFile.offset());
+            DataInputStream dataInputStream = new DataInputStream(inputStream);
+            return readDeletionVector(dataInputStream, (int) 
deletionFile.length());
+        } catch (Exception e) {
+            throw new RuntimeException("Unable to read deletion vector from 
file: " + indexFile, e);
+        }
+    }
+
     /**
      * Write deletion vectors to a new file, the format of this file can be 
referenced at: <a
      * href="https://cwiki.apache.org/confluence/x/Tws4EQ";>PIP-16</a>.
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java 
b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
index afafb42ec..eed18af17 100644
--- a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
+++ b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
@@ -32,11 +32,14 @@ import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.PathFactory;
 import org.apache.paimon.utils.SnapshotManager;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -45,6 +48,7 @@ import java.util.Set;
 import static 
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
 import static org.apache.paimon.index.HashIndexFile.HASH_INDEX;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
+import static org.apache.paimon.utils.Preconditions.checkNotNull;
 
 /** Handle index files. */
 public class IndexFileHandler {
@@ -81,6 +85,56 @@ public class IndexFileHandler {
         return result.isEmpty() ? Optional.empty() : 
Optional.of(result.get(0));
     }
 
+    public Map<String, DeletionFile> scanDVIndex(
+            @Nullable Long snapshotId, BinaryRow partition, int bucket) {
+        if (snapshotId == null) {
+            return Collections.emptyMap();
+        }
+        Snapshot snapshot = snapshotManager.snapshot(snapshotId);
+        String indexManifest = snapshot.indexManifest();
+        if (indexManifest == null) {
+            return Collections.emptyMap();
+        }
+        Map<String, DeletionFile> result = new HashMap<>();
+        for (IndexManifestEntry file : indexManifestFile.read(indexManifest)) {
+            IndexFileMeta meta = file.indexFile();
+            if (meta.indexType().equals(DELETION_VECTORS_INDEX)
+                    && file.partition().equals(partition)
+                    && file.bucket() == bucket) {
+                LinkedHashMap<String, Pair<Integer, Integer>> dvRanges =
+                        meta.deletionVectorsRanges();
+                checkNotNull(dvRanges);
+                for (String dataFile : dvRanges.keySet()) {
+                    Pair<Integer, Integer> pair = dvRanges.get(dataFile);
+                    DeletionFile deletionFile =
+                            new DeletionFile(
+                                    filePath(meta).toString(), pair.getLeft(), 
pair.getRight());
+                    result.put(dataFile, deletionFile);
+                }
+            }
+        }
+        return result;
+    }
+
+    public List<IndexManifestEntry> scan(String indexType) {
+        Snapshot snapshot = snapshotManager.latestSnapshot();
+        if (snapshot == null) {
+            return Collections.emptyList();
+        }
+        String indexManifest = snapshot.indexManifest();
+        if (indexManifest == null) {
+            return Collections.emptyList();
+        }
+
+        List<IndexManifestEntry> result = new ArrayList<>();
+        for (IndexManifestEntry file : indexManifestFile.read(indexManifest)) {
+            if (file.indexFile().indexType().equals(indexType)) {
+                result.add(file);
+            }
+        }
+        return result;
+    }
+
     public List<IndexFileMeta> scan(
             long snapshotId, String indexType, BinaryRow partition, int 
bucket) {
         List<IndexFileMeta> result = new ArrayList<>();
@@ -143,6 +197,26 @@ public class IndexFileHandler {
         return result;
     }
 
+    public List<IndexManifestEntry> scanEntries(String indexType, BinaryRow 
partition, int bucket) {
+        Snapshot snapshot = snapshotManager.latestSnapshot();
+        if (snapshot == null) {
+            return Collections.emptyList();
+        }
+        String indexManifest = snapshot.indexManifest();
+        if (indexManifest == null) {
+            return Collections.emptyList();
+        }
+        List<IndexManifestEntry> result = new ArrayList<>();
+        for (IndexManifestEntry file : indexManifestFile.read(indexManifest)) {
+            if (file.indexFile().indexType().equals(indexType)
+                    && file.partition().equals(partition)
+                    && file.bucket() == bucket) {
+                result.add(file);
+            }
+        }
+        return result;
+    }
+
     public Path filePath(IndexFileMeta file) {
         return pathFactory.toPath(file.fileName());
     }
@@ -218,8 +292,8 @@ public class IndexFileHandler {
     }
 
     public DeletionVectorIndexFileMaintainer createDVIndexFileMaintainer(
-            Map<String, DeletionFile> dataFileToDeletionFiles) {
-        return new DeletionVectorIndexFileMaintainer(this, 
dataFileToDeletionFiles);
+            Long snapshotId, BinaryRow partition, int bucket, boolean restore) 
{
+        return new DeletionVectorIndexFileMaintainer(this, snapshotId, 
partition, bucket, restore);
     }
 
     public Map<String, DeletionVector> 
readAllDeletionVectors(List<IndexFileMeta> fileMetas) {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java 
b/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java
index d01210e47..c86b1cb40 100644
--- a/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java
+++ b/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java
@@ -119,8 +119,11 @@ public class TestAppendFileStore extends 
AppendOnlyFileStore {
     }
 
     public DeletionVectorIndexFileMaintainer createDVIFMaintainer(
-            Map<String, DeletionFile> dataFileToDeletionFiles) {
-        return new DeletionVectorIndexFileMaintainer(fileHandler, 
dataFileToDeletionFiles);
+            BinaryRow partition, int bucket, Map<String, DeletionFile> 
dataFileToDeletionFiles) {
+        DeletionVectorIndexFileMaintainer maintainer =
+                new DeletionVectorIndexFileMaintainer(fileHandler, null, 
partition, bucket, false);
+        maintainer.init(dataFileToDeletionFiles);
+        return maintainer;
     }
 
     public DeletionVectorsMaintainer createOrRestoreDVMaintainer(BinaryRow 
partition, int bucket) {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileMaintainerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileMaintainerTest.java
index 20cb47557..f78e39dfb 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileMaintainerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileMaintainerTest.java
@@ -69,7 +69,7 @@ public class DeletionVectorIndexFileMaintainerTest {
                         indexPathFactory, 
commitMessage2.indexIncrement().newIndexFiles()));
 
         DeletionVectorIndexFileMaintainer dvIFMaintainer =
-                store.createDVIFMaintainer(dataFileToDeletionFiles);
+                store.createDVIFMaintainer(BinaryRow.EMPTY_ROW, 1, 
dataFileToDeletionFiles);
 
         // no dv should be rewritten, because nothing is changed.
         List<IndexManifestEntry> res = 
dvIFMaintainer.writeUnchangedDeletionVector();
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
index 4bcd9dce6..2aef8e576 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
@@ -25,6 +25,7 @@ import org.apache.paimon.spark.catalyst.Compatibility
 import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
 import org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand
 import org.apache.paimon.spark.schema.SparkSystemColumns.ROW_KIND_COL
+import org.apache.paimon.spark.util.SQLHelper
 import org.apache.paimon.table.{BucketMode, FileStoreTable}
 import org.apache.paimon.table.sink.{BatchWriteBuilder, CommitMessage}
 import org.apache.paimon.types.RowKind
@@ -49,7 +50,8 @@ case class DeleteFromPaimonTableCommand(
   extends PaimonLeafRunnableCommand
   with PaimonCommand
   with ExpressionHelper
-  with SupportsSubquery {
+  with SupportsSubquery
+  with SQLHelper {
 
   private lazy val writer = PaimonSparkWriter(table)
 
@@ -124,21 +126,18 @@ case class DeleteFromPaimonTableCommand(
     val dataFilePathToMeta = candidateFileMap(candidateDataSplits)
 
     if (deletionVectorsEnabled) {
-      // Step2: collect all the deletion vectors that marks the deleted rows.
-      val deletionVectors = collectDeletionVectors(
-        candidateDataSplits,
-        dataFilePathToMeta,
-        condition,
-        relation,
-        sparkSession)
-
-      deletionVectors.cache()
-      try {
-        updateDeletionVector(deletionVectors, dataFilePathToMeta, writer)
-      } finally {
-        deletionVectors.unpersist()
-      }
+      withSQLConf("spark.sql.adaptive.enabled" -> "false") {
+        // Step2: collect all the deletion vectors that marks the deleted rows.
+        val deletionVectors = collectDeletionVectors(
+          candidateDataSplits,
+          dataFilePathToMeta,
+          condition,
+          relation,
+          sparkSession)
 
+        // Step3: update the touched deletion vectors and index files
+        writer.persistDeletionVectors(deletionVectors)
+      }
     } else {
       // Step2: extract out the exactly files, which must have at least one 
record to be updated.
       val touchedFilePaths =
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
index abd89b0b6..d5dd1e19e 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
@@ -22,14 +22,12 @@ import 
org.apache.paimon.deletionvectors.{BitmapDeletionVector, DeletionVector}
 import org.apache.paimon.fs.Path
 import org.apache.paimon.index.IndexFileMeta
 import org.apache.paimon.io.{CompactIncrement, DataFileMeta, DataIncrement, 
IndexIncrement}
-import org.apache.paimon.manifest.IndexManifestEntry
 import org.apache.paimon.spark.{PaimonSplitScan, SparkFilterConverter}
 import org.apache.paimon.spark.catalyst.Compatibility
 import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
 import 
org.apache.paimon.spark.commands.SparkDataFileMeta.convertToSparkDataFileMeta
 import org.apache.paimon.spark.schema.PaimonMetadataColumn
 import org.apache.paimon.spark.schema.PaimonMetadataColumn._
-import org.apache.paimon.table.BucketMode
 import org.apache.paimon.table.sink.{CommitMessage, CommitMessageImpl}
 import org.apache.paimon.table.source.DataSplit
 import org.apache.paimon.types.RowType
@@ -148,62 +146,6 @@ trait PaimonCommand extends WithFileStoreTable with 
ExpressionHelper {
       .toMap
   }
 
-  protected def getDeletedIndexFiles(
-      dataFilePathToMeta: Map[String, SparkDataFileMeta],
-      newDeletionVectors: Dataset[SparkDeletionVectors]
-  ): Seq[IndexManifestEntry] = {
-    val deletionFiles = dataFilePathToMeta.flatMap {
-      case (relativePath, sdf) =>
-        sdf.deletionFile match {
-          case Some(deletionFile) =>
-            Some((relativePath, deletionFile))
-          case None => None
-        }
-    }
-    val dvIndexFileMaintainer = fileStore
-      .newIndexFileHandler()
-      .createDVIndexFileMaintainer(deletionFiles.asJava)
-
-    val pathFactory = fileStore.pathFactory()
-    val touchedDataFileAndDeletionFiles = newDeletionVectors
-      .collect()
-      .flatMap {
-        sdv =>
-          val relativePaths = sdv.relativePaths(pathFactory)
-          relativePaths.flatMap {
-            relativePath =>
-              dataFilePathToMeta(relativePath).deletionFile match {
-                case Some(deletionFile) => Some(relativePath, deletionFile)
-                case _ => None
-              }
-          }
-      }
-      .toMap
-
-    
dvIndexFileMaintainer.notifyDeletionFiles(touchedDataFileAndDeletionFiles.asJava)
-
-    dvIndexFileMaintainer.writeUnchangedDeletionVector().asScala
-  }
-
-  protected def updateDeletionVector(
-      deletionVectors: Dataset[SparkDeletionVectors],
-      dataFilePathToMeta: Map[String, SparkDataFileMeta],
-      writer: PaimonSparkWriter): Seq[CommitMessage] = {
-    // Step1: write the new deletion vectors
-    val newIndexCommitMsg = writer.persistDeletionVectors(deletionVectors)
-
-    // Step2: write the unchanged deletion vectors where store in touched dv 
index files, and mark these touched index files as DELETE if needed.
-    val rewriteIndexCommitMsg = fileStore.bucketMode() match {
-      case BucketMode.BUCKET_UNAWARE =>
-        val indexEntries = getDeletedIndexFiles(dataFilePathToMeta, 
deletionVectors)
-        writer.buildCommitMessageFromIndexManifestEntry(indexEntries)
-      case _ =>
-        Seq.empty[CommitMessage]
-    }
-
-    newIndexCommitMsg ++ rewriteIndexCommitMsg
-  }
-
   protected def collectDeletionVectors(
       candidateDataSplits: Seq[DataSplit],
       dataFilePathToMeta: Map[String, SparkDataFileMeta],
@@ -212,12 +154,12 @@ trait PaimonCommand extends WithFileStoreTable with 
ExpressionHelper {
       sparkSession: SparkSession): Dataset[SparkDeletionVectors] = {
     import sparkSession.implicits._
 
-    val dataFileAndDeletionFile = 
dataFilePathToMeta.mapValues(_.toSparkDeletionFile).toArray
+    val dataFileToPartitionAndBucket =
+      dataFilePathToMeta.mapValues(meta => (meta.partition, 
meta.bucket)).toArray
     val metadataCols = Seq(FILE_PATH, ROW_INDEX)
     val filteredRelation = createNewScanPlan(candidateDataSplits, condition, 
relation, metadataCols)
 
     val store = table.store()
-    val fileIO = table.fileIO()
     val location = table.location
     createDataset(sparkSession, filteredRelation)
       .select(FILE_PATH_COLUMN, ROW_INDEX_COLUMN)
@@ -225,29 +167,22 @@ trait PaimonCommand extends WithFileStoreTable with 
ExpressionHelper {
       .groupByKey(_._1)
       .mapGroups {
         case (filePath, iter) =>
-          val fileNameToDeletionFile = dataFileAndDeletionFile.toMap
           val dv = new BitmapDeletionVector()
           while (iter.hasNext) {
             dv.delete(iter.next()._2)
           }
 
           val relativeFilePath = location.toUri.relativize(new 
URI(filePath)).toString
-          val sparkDeletionFile = fileNameToDeletionFile(relativeFilePath)
-          sparkDeletionFile.deletionFile match {
-            case Some(deletionFile) =>
-              dv.merge(DeletionVector.read(fileIO, deletionFile))
-            case None =>
-          }
-
+          val (partition, bucket) = 
dataFileToPartitionAndBucket.toMap.apply(relativeFilePath)
           val pathFactory = store.pathFactory()
           val partitionAndBucket = pathFactory
-            .relativePartitionAndBucketPath(sparkDeletionFile.partition, 
sparkDeletionFile.bucket)
+            .relativePartitionAndBucketPath(partition, bucket)
             .toString
 
           SparkDeletionVectors(
             partitionAndBucket,
-            SerializationUtils.serializeBinaryRow(sparkDeletionFile.partition),
-            sparkDeletionFile.bucket,
+            SerializationUtils.serializeBinaryRow(partition),
+            bucket,
             Seq((new Path(filePath).getName, dv.serializeToBytes()))
           )
       }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
index 41f2db1a3..62fb4689d 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
@@ -20,7 +20,7 @@ package org.apache.paimon.spark.commands
 
 import org.apache.paimon.CoreOptions.WRITE_ONLY
 import org.apache.paimon.data.BinaryRow
-import org.apache.paimon.deletionvectors.{DeletionVector, 
DeletionVectorsMaintainer}
+import org.apache.paimon.deletionvectors.{DeletionVector, 
DeletionVectorIndexFileMaintainer}
 import org.apache.paimon.index.{BucketAssigner, SimpleHashBucketAssigner}
 import org.apache.paimon.io.{CompactIncrement, DataIncrement, IndexIncrement}
 import org.apache.paimon.manifest.{FileKind, IndexManifestEntry}
@@ -200,60 +200,55 @@ case class PaimonSparkWriter(table: FileStoreTable) {
 
   /**
    * Write all the deletion vectors to the index files. If it's in unaware 
mode, one index file maps
-   * one deletion vector; else, one index file will contains all deletion 
vector with the same
+   * deletion vectors; else, one index file will contains all deletion vector 
with the same
    * partition and bucket.
    */
   def persistDeletionVectors(deletionVectors: Dataset[SparkDeletionVectors]): 
Seq[CommitMessage] = {
     val sparkSession = deletionVectors.sparkSession
     import sparkSession.implicits._
-
+    val snapshotId = table.snapshotManager().latestSnapshotId();
     val fileStore = table.store()
-    val lastSnapshotId = table.snapshotManager().latestSnapshotId()
-
-    def createOrRestoreDVMaintainer(
-        partition: BinaryRow,
-        bucket: Int): DeletionVectorsMaintainer = {
-      val deletionVectorsMaintainerFactory =
-        new DeletionVectorsMaintainer.Factory(fileStore.newIndexFileHandler())
-      fileStore.bucketMode() match {
-        case BucketMode.BUCKET_UNAWARE =>
-          deletionVectorsMaintainerFactory.create()
-        case _ =>
-          deletionVectorsMaintainerFactory.createOrRestore(lastSnapshotId, 
partition, bucket)
-      }
-    }
-
-    def commitDeletionVector(
-        sdv: SparkDeletionVectors,
-        serializer: CommitMessageSerializer): Array[Byte] = {
-      val partition = SerializationUtils.deserializeBinaryRow(sdv.partition)
-      val maintainer = createOrRestoreDVMaintainer(partition, sdv.bucket)
-      sdv.dataFileAndDeletionVector.foreach {
-        case (dataFileName, dv) =>
-          maintainer.notifyNewDeletion(dataFileName, 
DeletionVector.deserializeFromBytes(dv))
-      }
-
-      val commitMessage = new CommitMessageImpl(
-        partition,
-        sdv.bucket,
-        DataIncrement.emptyIncrement(),
-        CompactIncrement.emptyIncrement(),
-        new IndexIncrement(maintainer.writeDeletionVectorsIndex()))
-
-      serializer.serialize(commitMessage)
-    }
-
     val serializedCommits = deletionVectors
       .groupByKey(_.partitionAndBucket)
       .mapGroups {
         case (_, iter: Iterator[SparkDeletionVectors]) =>
-          val serializer = new CommitMessageSerializer
-          val grouped = iter.reduce {
-            (sdv1, sdv2) =>
-              sdv1.copy(dataFileAndDeletionVector =
-                sdv1.dataFileAndDeletionVector ++ 
sdv2.dataFileAndDeletionVector)
+          val indexHandler = fileStore.newIndexFileHandler()
+          var dvIndexFileMaintainer: DeletionVectorIndexFileMaintainer = null
+          while (iter.hasNext) {
+            val sdv: SparkDeletionVectors = iter.next()
+            if (dvIndexFileMaintainer == null) {
+              val partition = 
SerializationUtils.deserializeBinaryRow(sdv.partition)
+              dvIndexFileMaintainer = indexHandler
+                .createDVIndexFileMaintainer(
+                  snapshotId,
+                  partition,
+                  sdv.bucket,
+                  bucketMode != BucketMode.BUCKET_UNAWARE)
+            }
+            if (dvIndexFileMaintainer == null) {
+              throw new RuntimeException("can't create the dv maintainer.")
+            }
+
+            sdv.dataFileAndDeletionVector.foreach {
+              case (dataFileName, dv) =>
+                dvIndexFileMaintainer.notifyDeletionFiles(
+                  dataFileName,
+                  DeletionVector.deserializeFromBytes(dv))
+            }
           }
-          commitDeletionVector(grouped, serializer)
+          val indexEntries = dvIndexFileMaintainer.persist()
+
+          val (added, deleted) = indexEntries.asScala.partition(_.kind() == 
FileKind.ADD)
+
+          val commitMessage = new CommitMessageImpl(
+            dvIndexFileMaintainer.getPartition,
+            dvIndexFileMaintainer.getBucket,
+            DataIncrement.emptyIncrement(),
+            CompactIncrement.emptyIncrement(),
+            new IndexIncrement(added.map(_.indexFile).asJava, 
deleted.map(_.indexFile).asJava)
+          )
+          val serializer = new CommitMessageSerializer
+          serializer.serialize(commitMessage)
       }
     serializedCommits
       .collect()
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDataFileMeta.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDataFileMeta.scala
index 1e0f1d6d3..b380d36c3 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDataFileMeta.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDataFileMeta.scala
@@ -39,14 +39,8 @@ case class SparkDataFileMeta(
       .toUri
       .toString + "/" + dataFileMeta.fileName()
   }
-
-  def toSparkDeletionFile: SparkDeletionFile = {
-    SparkDeletionFile(partition, bucket, deletionFile)
-  }
 }
 
-case class SparkDeletionFile(partition: BinaryRow, bucket: Int, deletionFile: 
Option[DeletionFile])
-
 object SparkDataFileMeta {
   def convertToSparkDataFileMeta(
       dataSplit: DataSplit,
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
index ca12d6a1a..6c7d07bf5 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
@@ -104,7 +104,7 @@ case class UpdatePaimonTableCommand(
           val addCommitMessage = writeOnlyUpdatedData(sparkSession, 
touchedDataSplits)
 
           // Step4: write these deletion vectors.
-          val indexCommitMsg = updateDeletionVector(deletionVectors, 
dataFilePathToMeta, writer)
+          val indexCommitMsg = writer.persistDeletionVectors(deletionVectors)
 
           addCommitMessage ++ indexCommitMsg
         } finally {
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/SQLHelper.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/SQLHelper.scala
new file mode 100644
index 000000000..88243b36d
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/SQLHelper.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.util
+
+import org.apache.spark.sql.internal.SQLConf
+
+trait SQLHelper {
+
+  /**
+   * Sets all SQL configurations specified in `pairs`, calls `f`, and then 
restores all SQL
+   * configurations.
+   */
+  protected def withSQLConf[T](pairs: (String, String)*)(f: => T): T = {
+    val conf = SQLConf.get
+    val (keys, values) = pairs.unzip
+    val currentValues = keys.map {
+      key =>
+        if (conf.contains(key)) {
+          Some(conf.getConfString(key))
+        } else {
+          None
+        }
+    }
+    (keys, values).zipped.foreach {
+      (k, v) =>
+        if (SQLConf.isStaticConfigKey(k)) {
+          throw new RuntimeException(s"Cannot modify the value of a static 
config: $k")
+        }
+        conf.setConfString(k, v)
+    }
+    try f
+    finally {
+      keys.zip(currentValues).foreach {
+        case (key, Some(value)) => conf.setConfString(key, value)
+        case (key, None) => conf.unsetConf(key)
+      }
+    }
+  }
+}

Reply via email to