This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new a2363e621 [spark] spark delete with deletion vector (#3390)
a2363e621 is described below

commit a2363e6210d4c1a289f895ee4ed96cb9188c757b
Author: Yann Byron <[email protected]>
AuthorDate: Mon May 27 16:38:52 2024 +0800

    [spark] spark delete with deletion vector (#3390)
---
 .../org/apache/paimon/utils/RoaringBitmap32.java   |  13 ++
 .../deletionvectors/BitmapDeletionVector.java      |  13 ++
 .../deletionvectors/DeletionVectorsMaintainer.java |  16 ++
 .../org/apache/paimon/index/IndexFileHandler.java  |   7 +
 .../org/apache/paimon/schema/SchemaValidation.java |   8 +-
 .../apache/paimon/table/source/DeletionFile.java   |   5 +-
 .../org/apache/paimon/spark/PaimonSplitScan.scala  |  10 +-
 .../commands/DeleteFromPaimonTableCommand.scala    |  88 ++++++----
 .../paimon/spark/commands/PaimonCommand.scala      | 112 +++++++++++-
 .../paimon/spark/commands/PaimonSparkWriter.scala  |  98 ++++++++++-
 .../paimon/spark/commands/SparkDataFileMeta.scala  |  30 +++-
 ...StoreTable.scala => SparkDeletionVectors.scala} |  27 ++-
 .../paimon/spark/commands/WithFileStoreTable.scala |   7 +
 .../paimon/spark/schema/PaimonMetadataColumn.scala |   5 +
 .../paimon/spark/sql/DeletionVectorTest.scala      | 192 ++++++++++++++++++---
 15 files changed, 547 insertions(+), 84 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/utils/RoaringBitmap32.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/RoaringBitmap32.java
index 6458c825a..ee83d1232 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/RoaringBitmap32.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/RoaringBitmap32.java
@@ -23,6 +23,7 @@ import org.roaringbitmap.RoaringBitmap;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.Objects;
 
 /** A compressed bitmap for 32-bit integer. */
 public class RoaringBitmap32 {
@@ -67,4 +68,16 @@ public class RoaringBitmap32 {
     public void deserialize(DataInput in) throws IOException {
         roaringBitmap.deserialize(in);
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        RoaringBitmap32 that = (RoaringBitmap32) o;
+        return Objects.equals(this.roaringBitmap, that.roaringBitmap);
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/BitmapDeletionVector.java
 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/BitmapDeletionVector.java
index 2948c10d6..a2c592596 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/BitmapDeletionVector.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/BitmapDeletionVector.java
@@ -24,6 +24,7 @@ import java.io.ByteArrayOutputStream;
 import java.io.DataInput;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.util.Objects;
 
 /**
  * A {@link DeletionVector} based on {@link RoaringBitmap32}, it only supports 
files with row count
@@ -104,4 +105,16 @@ public class BitmapDeletionVector implements 
DeletionVector {
                     "The file has too many rows, RoaringBitmap32 only supports 
files with row count not exceeding 2147483647.");
         }
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        BitmapDeletionVector that = (BitmapDeletionVector) o;
+        return Objects.equals(this.roaringBitmap, that.roaringBitmap);
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java
 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java
index ce9ad40a1..461e092e0 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java
@@ -22,6 +22,7 @@ import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.index.IndexFileHandler;
 import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.manifest.IndexManifestEntry;
 
 import javax.annotation.Nullable;
 
@@ -30,6 +31,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.stream.Collectors;
 
 import static 
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
 
@@ -138,6 +140,20 @@ public class DeletionVectorsMaintainer {
             return createOrRestore(deletionVectors);
         }
 
+        @VisibleForTesting
+        public DeletionVectorsMaintainer createOrRestore(
+                @Nullable Long snapshotId, BinaryRow partition) {
+            List<IndexFileMeta> indexFiles =
+                    snapshotId == null
+                            ? Collections.emptyList()
+                            : handler.scan(snapshotId, DELETION_VECTORS_INDEX, 
partition).stream()
+                                    .map(IndexManifestEntry::indexFile)
+                                    .collect(Collectors.toList());
+            Map<String, DeletionVector> deletionVectors =
+                    new HashMap<>(handler.readAllDeletionVectors(indexFiles));
+            return createOrRestore(deletionVectors);
+        }
+
         public DeletionVectorsMaintainer create() {
             return createOrRestore(new HashMap<>());
         }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java 
b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
index 2d3dee20e..cdb10ae13 100644
--- a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
+++ b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
@@ -21,10 +21,12 @@ package org.apache.paimon.index;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.deletionvectors.DeletionVector;
+import org.apache.paimon.deletionvectors.DeletionVectorIndexFileMaintainer;
 import org.apache.paimon.deletionvectors.DeletionVectorsIndexFile;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.manifest.IndexManifestEntry;
 import org.apache.paimon.manifest.IndexManifestFile;
+import org.apache.paimon.table.source.DeletionFile;
 import org.apache.paimon.utils.IntIterator;
 import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.PathFactory;
@@ -185,6 +187,11 @@ public class IndexFileHandler {
         indexManifestFile.delete(indexManifest);
     }
 
+    public DeletionVectorIndexFileMaintainer createDVIndexFileMaintainer(
+            Map<String, DeletionFile> dataFileToDeletionFiles) {
+        return new DeletionVectorIndexFileMaintainer(this, 
dataFileToDeletionFiles);
+    }
+
     public Map<String, DeletionVector> 
readAllDeletionVectors(List<IndexFileMeta> fileMetas) {
         Map<String, DeletionVector> deletionVectors = new HashMap<>();
         for (IndexFileMeta indexFile : fileMetas) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index 25b320515..03fbcfae8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -184,7 +184,7 @@ public class SchemaValidation {
                                         field));
 
         if (options.deletionVectorsEnabled()) {
-            validateForDeletionVectors(schema, options);
+            validateForDeletionVectors(options);
         }
     }
 
@@ -461,11 +461,7 @@ public class SchemaValidation {
         }
     }
 
-    private static void validateForDeletionVectors(TableSchema schema, 
CoreOptions options) {
-        checkArgument(
-                !schema.primaryKeys().isEmpty(),
-                "Deletion vectors mode is only supported for tables with 
primary keys.");
-
+    private static void validateForDeletionVectors(CoreOptions options) {
         checkArgument(
                 options.changelogProducer() == ChangelogProducer.NONE
                         || options.changelogProducer() == 
ChangelogProducer.LOOKUP,
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/DeletionFile.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DeletionFile.java
index 13089bb02..94dfc6157 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/DeletionFile.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DeletionFile.java
@@ -26,6 +26,7 @@ import org.apache.paimon.io.DataOutputView;
 import javax.annotation.Nullable;
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -44,7 +45,9 @@ import java.util.Optional;
  * </ul>
  */
 @Public
-public class DeletionFile {
+public class DeletionFile implements Serializable {
+
+    private static final long serialVersionUID = 1L;
 
     private final String path;
     private final long offset;
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSplitScan.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSplitScan.scala
index e86f4caf6..37991d0be 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSplitScan.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSplitScan.scala
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.spark
 
+import org.apache.paimon.spark.schema.PaimonMetadataColumn
 import org.apache.paimon.table.Table
 import org.apache.paimon.table.source.{DataSplit, Split}
 
@@ -25,12 +26,15 @@ import org.apache.spark.sql.connector.read.{Batch, Scan}
 import org.apache.spark.sql.types.StructType
 
 /** For internal use only. */
-case class PaimonSplitScan(table: Table, dataSplits: Array[DataSplit]) extends 
Scan {
-
+case class PaimonSplitScan(
+    table: Table,
+    dataSplits: Array[DataSplit],
+    metadataColumns: Seq[PaimonMetadataColumn] = Seq.empty)
+  extends Scan {
   override def readSchema(): StructType = 
SparkTypeUtils.fromPaimonRowType(table.rowType())
 
   override def toBatch: Batch = {
-    PaimonBatch(dataSplits.asInstanceOf[Array[Split]], table.newReadBuilder)
+    PaimonBatch(dataSplits.asInstanceOf[Array[Split]], table.newReadBuilder, 
metadataColumns)
   }
 
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
index 7ce78e79c..86d2aee8b 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
@@ -20,13 +20,12 @@ package org.apache.paimon.spark.commands
 
 import org.apache.paimon.CoreOptions
 import org.apache.paimon.CoreOptions.MergeEngine
-import org.apache.paimon.spark.{InsertInto, SparkTable}
 import org.apache.paimon.spark.PaimonSplitScan
 import org.apache.paimon.spark.catalyst.Compatibility
 import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
 import org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand
 import org.apache.paimon.spark.schema.SparkSystemColumns.ROW_KIND_COL
-import org.apache.paimon.table.FileStoreTable
+import org.apache.paimon.table.{BucketMode, FileStoreTable}
 import org.apache.paimon.table.sink.{BatchWriteBuilder, CommitMessage}
 import org.apache.paimon.types.RowKind
 import org.apache.paimon.utils.RowDataPartitionComputer
@@ -56,7 +55,7 @@ case class DeleteFromPaimonTableCommand(
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
 
-    val commit = table.store.newCommit(UUID.randomUUID.toString)
+    val commit = fileStore.newCommit(UUID.randomUUID.toString)
     if (condition == null || condition == TrueLiteral) {
       commit.truncateTable(BatchWriteBuilder.COMMIT_IDENTIFIER)
     } else {
@@ -100,7 +99,7 @@ case class DeleteFromPaimonTableCommand(
         val commitMessages = if (usePrimaryKeyDelete()) {
           performPrimaryKeyDelete(sparkSession)
         } else {
-          performDeleteCopyOnWrite(sparkSession)
+          performNonPrimaryKeyDelete(sparkSession)
         }
         writer.commit(commitMessages)
       }
@@ -119,39 +118,70 @@ case class DeleteFromPaimonTableCommand(
     writer.write(df)
   }
 
-  def performDeleteCopyOnWrite(sparkSession: SparkSession): Seq[CommitMessage] 
= {
+  def performNonPrimaryKeyDelete(sparkSession: SparkSession): 
Seq[CommitMessage] = {
+    val pathFactory = fileStore.pathFactory()
     // Step1: the candidate data splits which are filtered by Paimon Predicate.
     val candidateDataSplits = findCandidateDataSplits(condition, 
relation.output)
-    val fileNameToMeta = candidateFileMap(candidateDataSplits)
+    val dataFilePathToMeta = candidateFileMap(candidateDataSplits)
 
-    // Step2: extract out the exactly files, which must have at least one 
record to be updated.
-    val touchedFilePaths = findTouchedFiles(candidateDataSplits, condition, 
relation, sparkSession)
+    if (deletionVectorsEnabled) {
+      // Step2: collect all the deletion vectors that marks the deleted rows.
+      val deletionVectors = collectDeletionVectors(
+        candidateDataSplits,
+        dataFilePathToMeta,
+        condition,
+        relation,
+        sparkSession)
+
+      deletionVectors.cache()
+      try {
+        // Step3: write these deletion vectors.
+        val newIndexCommitMsg = writer.persistDeletionVectors(deletionVectors)
+
+        // Step4: mark the touched index files as DELETE if needed.
+        val rewriteIndexCommitMsg = fileStore.bucketMode() match {
+          case BucketMode.BUCKET_UNAWARE =>
+            val indexEntries = getDeletedIndexFiles(dataFilePathToMeta, 
deletionVectors)
+            writer.buildCommitMessageFromIndexManifestEntry(indexEntries)
+          case _ =>
+            Seq.empty[CommitMessage]
+        }
 
-    // Step3: the smallest range of data files that need to be rewritten.
-    val touchedFiles = touchedFilePaths.map {
-      file => fileNameToMeta.getOrElse(file, throw new 
RuntimeException(s"Missing file: $file"))
-    }
+        newIndexCommitMsg ++ rewriteIndexCommitMsg
+      } finally {
+        deletionVectors.unpersist()
+      }
 
-    // Step4: build a dataframe that contains the unchanged data, and write 
out them.
-    val touchedDataSplits = SparkDataFileMeta.convertToDataSplits(
-      touchedFiles,
-      rawConvertible = true,
-      table.store().pathFactory())
-    val toRewriteScanRelation = Filter(
-      Not(condition),
-      Compatibility.createDataSourceV2ScanRelation(
-        relation,
-        PaimonSplitScan(table, touchedDataSplits),
-        relation.output))
-    val data = createDataset(sparkSession, toRewriteScanRelation)
+    } else {
+      // Step2: extract out the exactly files, which must have at least one 
record to be updated.
+      val touchedFilePaths =
+        findTouchedFiles(candidateDataSplits, condition, relation, 
sparkSession)
+
+      // Step3: the smallest range of data files that need to be rewritten.
+      val touchedFiles = touchedFilePaths.map {
+        file =>
+          dataFilePathToMeta.getOrElse(file, throw new 
RuntimeException(s"Missing file: $file"))
+      }
 
-    // only write new files, should have no compaction
-    val addCommitMessage = writer.writeOnly().write(data)
+      // Step4: build a dataframe that contains the unchanged data, and write 
out them.
+      val touchedDataSplits =
+        SparkDataFileMeta.convertToDataSplits(touchedFiles, rawConvertible = 
true, pathFactory)
+      val toRewriteScanRelation = Filter(
+        Not(condition),
+        Compatibility.createDataSourceV2ScanRelation(
+          relation,
+          PaimonSplitScan(table, touchedDataSplits),
+          relation.output))
+      val data = createDataset(sparkSession, toRewriteScanRelation)
 
-    // Step5: convert the deleted files that need to be wrote to commit 
message.
-    val deletedCommitMessage = buildDeletedCommitMessage(touchedFiles)
+      // only write new files, should have no compaction
+      val addCommitMessage = writer.writeOnly().write(data)
 
-    addCommitMessage ++ deletedCommitMessage
+      // Step5: convert the deleted files that need to be wrote to commit 
message.
+      val deletedCommitMessage = buildDeletedCommitMessage(touchedFiles)
+
+      addCommitMessage ++ deletedCommitMessage
+    }
   }
 
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
index 863310e55..915def57d 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
@@ -18,23 +18,27 @@
 
 package org.apache.paimon.spark.commands
 
+import org.apache.paimon.deletionvectors.{BitmapDeletionVector, DeletionVector}
+import org.apache.paimon.fs.Path
 import org.apache.paimon.index.IndexFileMeta
 import org.apache.paimon.io.{CompactIncrement, DataFileMeta, DataIncrement, 
IndexIncrement}
+import org.apache.paimon.manifest.IndexManifestEntry
 import org.apache.paimon.spark.{PaimonSplitScan, SparkFilterConverter}
 import org.apache.paimon.spark.catalyst.Compatibility
 import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
 import 
org.apache.paimon.spark.commands.SparkDataFileMeta.convertToSparkDataFileMeta
+import org.apache.paimon.spark.schema.PaimonMetadataColumn._
 import org.apache.paimon.table.sink.{CommitMessage, CommitMessageImpl}
 import org.apache.paimon.table.source.DataSplit
 import org.apache.paimon.types.RowType
-import org.apache.paimon.utils.Preconditions
+import org.apache.paimon.utils.SerializationUtils
 
+import org.apache.spark.sql.{Dataset, SparkSession}
 import org.apache.spark.sql.PaimonUtils.createDataset
-import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
 import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
-import org.apache.spark.sql.catalyst.plans.logical.{Filter => 
FilterLogicalNode}
-import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, 
DataSourceV2ScanRelation}
+import org.apache.spark.sql.catalyst.plans.logical.{Filter => 
FilterLogicalNode, Project}
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
 import org.apache.spark.sql.functions.input_file_name
 import org.apache.spark.sql.sources.{AlwaysTrue, And, EqualNullSafe, Filter}
 
@@ -135,17 +139,113 @@ trait PaimonCommand extends WithFileStoreTable with 
ExpressionHelper {
       .map(relativePath)
   }
 
+  /** Notice that, the key is a relative path, not just the file name. */
   protected def candidateFileMap(
       candidateDataSplits: Seq[DataSplit]): Map[String, SparkDataFileMeta] = {
-    val totalBuckets = table.coreOptions().bucket()
+    val totalBuckets = coreOptions.bucket()
     val candidateDataFiles = candidateDataSplits
       .flatMap(dataSplit => convertToSparkDataFileMeta(dataSplit, 
totalBuckets))
-    val fileStorePathFactory = table.store().pathFactory()
+    val fileStorePathFactory = fileStore.pathFactory()
     candidateDataFiles
       .map(file => (file.relativePath(fileStorePathFactory), file))
       .toMap
   }
 
+  protected def getDeletedIndexFiles(
+      dataFilePathToMeta: Map[String, SparkDataFileMeta],
+      newDeletionVectors: Dataset[SparkDeletionVectors]
+  ): Seq[IndexManifestEntry] = {
+    val deletionFiles = dataFilePathToMeta.flatMap {
+      case (relativePath, sdf) =>
+        sdf.deletionFile match {
+          case Some(deletionFile) =>
+            Some((relativePath, deletionFile))
+          case None => None
+        }
+    }
+    val dvIndexFileMaintainer = fileStore
+      .newIndexFileHandler()
+      .createDVIndexFileMaintainer(deletionFiles.asJava)
+
+    val pathFactory = fileStore.pathFactory()
+    val touchedDataFileAndDeletionFiles = newDeletionVectors
+      .collect()
+      .flatMap {
+        sdv =>
+          val relativePaths = sdv.relativePaths(pathFactory)
+          relativePaths.flatMap {
+            relativePath =>
+              dataFilePathToMeta(relativePath).deletionFile match {
+                case Some(deletionFile) => Some(relativePath, deletionFile)
+                case _ => None
+              }
+          }
+      }
+      .toMap
+
+    
dvIndexFileMaintainer.notifyDeletionFiles(touchedDataFileAndDeletionFiles.asJava)
+    dvIndexFileMaintainer.writeUnchangedDeletionVector().asScala
+  }
+  protected def collectDeletionVectors(
+      candidateDataSplits: Seq[DataSplit],
+      dataFilePathToMeta: Map[String, SparkDataFileMeta],
+      condition: Expression,
+      relation: DataSourceV2Relation,
+      sparkSession: SparkSession): Dataset[SparkDeletionVectors] = {
+    import sparkSession.implicits._
+
+    val dataFileAndDeletionFile = 
dataFilePathToMeta.mapValues(_.toSparkDeletionFile).toArray
+    val metadataCols = Seq(FILE_PATH, ROW_INDEX)
+    val metadataProj = metadataCols.map(_.toAttribute)
+    val scan = PaimonSplitScan(table, candidateDataSplits.toArray, 
metadataCols)
+    val filteredRelation = {
+      Project(
+        metadataProj,
+        FilterLogicalNode(
+          condition,
+          Compatibility.createDataSourceV2ScanRelation(
+            relation,
+            scan,
+            relation.output ++ metadataProj)))
+    }
+
+    val store = table.store()
+    val fileIO = table.fileIO()
+    val location = table.location
+    createDataset(sparkSession, filteredRelation)
+      .select(FILE_PATH_COLUMN, ROW_INDEX_COLUMN)
+      .as[(String, Long)]
+      .groupByKey(_._1)
+      .mapGroups {
+        case (filePath, iter) =>
+          val fileNameToDeletionFile = dataFileAndDeletionFile.toMap
+          val dv = new BitmapDeletionVector()
+          while (iter.hasNext) {
+            dv.delete(iter.next()._2)
+          }
+
+          val relativeFilePath = location.toUri.relativize(new 
URI(filePath)).toString
+          val sparkDeletionFile = fileNameToDeletionFile(relativeFilePath)
+          sparkDeletionFile.deletionFile match {
+            case Some(deletionFile) =>
+              dv.merge(DeletionVector.read(fileIO, deletionFile))
+            case None =>
+          }
+
+          val pathFactory = store.pathFactory()
+          val partitionAndBucket = pathFactory
+            .relativePartitionAndBucketPath(sparkDeletionFile.partition, 
sparkDeletionFile.bucket)
+            .toString
+
+          SparkDeletionVectors(
+            partitionAndBucket,
+            SerializationUtils.serializeBinaryRow(sparkDeletionFile.partition),
+            sparkDeletionFile.bucket,
+            Seq((new Path(filePath).getName, dv.serializeToBytes()))
+          )
+      }
+  }
+
   protected def buildDeletedCommitMessage(
       deletedFiles: Array[SparkDataFileMeta]): Seq[CommitMessage] = {
     deletedFiles
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
index b9abbb02d..e97d88a0a 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
@@ -19,15 +19,17 @@
 package org.apache.paimon.spark.commands
 
 import org.apache.paimon.CoreOptions.WRITE_ONLY
+import org.apache.paimon.data.BinaryRow
+import org.apache.paimon.deletionvectors.{DeletionVector, 
DeletionVectorsMaintainer}
 import org.apache.paimon.index.{BucketAssigner, SimpleHashBucketAssigner}
+import org.apache.paimon.io.{CompactIncrement, DataIncrement, IndexIncrement}
+import org.apache.paimon.manifest.{FileKind, IndexManifestEntry}
 import org.apache.paimon.spark.{SparkRow, SparkTableWrite}
-import org.apache.paimon.spark.schema.SparkSystemColumns
 import org.apache.paimon.spark.schema.SparkSystemColumns.{BUCKET_COL, 
ROW_KIND_COL}
 import org.apache.paimon.spark.util.SparkRowUtils
 import org.apache.paimon.table.{BucketMode, FileStoreTable}
-import org.apache.paimon.table.sink.{BatchWriteBuilder, CommitMessage, 
CommitMessageSerializer, RowPartitionKeyExtractor}
-import org.apache.paimon.utils.Preconditions
-import org.apache.paimon.utils.Preconditions.checkArgument
+import org.apache.paimon.table.sink.{BatchWriteBuilder, CommitMessage, 
CommitMessageImpl, CommitMessageSerializer, RowPartitionKeyExtractor}
+import org.apache.paimon.utils.SerializationUtils
 
 import org.apache.spark.{Partitioner, TaskContext}
 import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
@@ -196,6 +198,94 @@ case class PaimonSparkWriter(table: FileStoreTable) {
       .toSeq
   }
 
+  /**
+   * Write all the deletion vectors to the index files. If it's in unaware 
mode, one index file maps
+   * one deletion vector; else, one index file will contains all deletion 
vector with the same
+   * partition and bucket.
+   */
+  def persistDeletionVectors(deletionVectors: Dataset[SparkDeletionVectors]): 
Seq[CommitMessage] = {
+    val sparkSession = deletionVectors.sparkSession
+    import sparkSession.implicits._
+
+    val fileStore = table.store()
+    val lastSnapshotId = table.snapshotManager().latestSnapshotId()
+
+    def createOrRestoreDVMaintainer(
+        partition: BinaryRow,
+        bucket: Int): DeletionVectorsMaintainer = {
+      val deletionVectorsMaintainerFactory =
+        new DeletionVectorsMaintainer.Factory(fileStore.newIndexFileHandler())
+      fileStore.bucketMode() match {
+        case BucketMode.BUCKET_UNAWARE =>
+          deletionVectorsMaintainerFactory.create()
+        case _ =>
+          deletionVectorsMaintainerFactory.createOrRestore(lastSnapshotId, 
partition, bucket)
+      }
+    }
+
+    def commitDeletionVector(
+        sdv: SparkDeletionVectors,
+        serializer: CommitMessageSerializer): Array[Byte] = {
+      val partition = SerializationUtils.deserializeBinaryRow(sdv.partition)
+      val maintainer = createOrRestoreDVMaintainer(partition, sdv.bucket)
+      sdv.dataFileAndDeletionVector.foreach {
+        case (dataFileName, dv) =>
+          maintainer.notifyNewDeletion(dataFileName, 
DeletionVector.deserializeFromBytes(dv))
+      }
+
+      val commitMessage = new CommitMessageImpl(
+        partition,
+        sdv.bucket,
+        DataIncrement.emptyIncrement(),
+        CompactIncrement.emptyIncrement(),
+        new IndexIncrement(maintainer.prepareCommit()))
+
+      serializer.serialize(commitMessage)
+    }
+
+    val serializedCommits = fileStore.bucketMode() match {
+      case BucketMode.BUCKET_UNAWARE =>
+        deletionVectors.mapPartitions {
+          iter: Iterator[SparkDeletionVectors] =>
+            val serializer = new CommitMessageSerializer
+            iter.map(commitDeletionVector(_, serializer))
+        }
+      case _ =>
+        deletionVectors
+          .groupByKey(_.partitionAndBucket)
+          .mapGroups {
+            case (_, iter: Iterator[SparkDeletionVectors]) =>
+              val serializer = new CommitMessageSerializer
+              val grouped = iter
+                .reduce(
+                  (sdv1, sdv2) =>
+                    sdv1.copy(dataFileAndDeletionVector =
+                      sdv1.dataFileAndDeletionVector ++ 
sdv2.dataFileAndDeletionVector))
+              commitDeletionVector(grouped, serializer)
+          }
+    }
+    serializedCommits
+      .collect()
+      .map(deserializeCommitMessage(serializer, _))
+  }
+
+  def buildCommitMessageFromIndexManifestEntry(
+      indexManifestEntries: Seq[IndexManifestEntry]): Seq[CommitMessage] = {
+    indexManifestEntries
+      .groupBy(entry => (entry.partition(), entry.bucket()))
+      .map {
+        case ((partition, bucket), entries) =>
+          val (added, removed) = entries.partition(_.kind() == FileKind.ADD)
+          new CommitMessageImpl(
+            partition,
+            bucket,
+            DataIncrement.emptyIncrement(),
+            CompactIncrement.emptyIncrement(),
+            new IndexIncrement(added.map(_.indexFile()).asJava, 
removed.map(_.indexFile()).asJava))
+      }
+      .toSeq
+  }
+
   def commit(commitMessages: Seq[CommitMessage]): Unit = {
     val tableCommit = writeBuilder.newCommit()
     try {
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDataFileMeta.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDataFileMeta.scala
index 8299f6b8c..1e0f1d6d3 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDataFileMeta.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDataFileMeta.scala
@@ -20,7 +20,8 @@ package org.apache.paimon.spark.commands
 
 import org.apache.paimon.data.BinaryRow
 import org.apache.paimon.io.DataFileMeta
-import org.apache.paimon.table.source.DataSplit
+import org.apache.paimon.spark.PaimonImplicits
+import org.apache.paimon.table.source.{DataSplit, DeletionFile}
 import org.apache.paimon.utils.FileStorePathFactory
 
 import scala.collection.JavaConverters._
@@ -29,7 +30,8 @@ case class SparkDataFileMeta(
     partition: BinaryRow,
     bucket: Int,
     totalBuckets: Int,
-    dataFileMeta: DataFileMeta) {
+    dataFileMeta: DataFileMeta,
+    deletionFile: Option[DeletionFile] = None) {
 
   def relativePath(fileStorePathFactory: FileStorePathFactory): String = {
     fileStorePathFactory
@@ -37,14 +39,30 @@ case class SparkDataFileMeta(
       .toUri
       .toString + "/" + dataFileMeta.fileName()
   }
+
+  def toSparkDeletionFile: SparkDeletionFile = {
+    SparkDeletionFile(partition, bucket, deletionFile)
+  }
 }
 
+case class SparkDeletionFile(partition: BinaryRow, bucket: Int, deletionFile: 
Option[DeletionFile])
+
 object SparkDataFileMeta {
   def convertToSparkDataFileMeta(
       dataSplit: DataSplit,
       totalBuckets: Int): Seq[SparkDataFileMeta] = {
+    import PaimonImplicits._
+
+    val dvFactory =
+      DeletionFile.factory(dataSplit.dataFiles(), 
dataSplit.deletionFiles().orElse(null))
     dataSplit.dataFiles().asScala.map {
-      file => SparkDataFileMeta(dataSplit.partition, dataSplit.bucket, 
totalBuckets, file)
+      file =>
+        SparkDataFileMeta(
+          dataSplit.partition,
+          dataSplit.bucket,
+          totalBuckets,
+          file,
+          dvFactory.create(file.fileName()))
     }
   }
 
@@ -56,10 +74,14 @@ object SparkDataFileMeta {
       .groupBy(file => (file.partition, file.bucket))
       .map {
         case ((partition, bucket), files) =>
+          val (dataFiles, deletionFiles) = files.map {
+            file => (file.dataFileMeta, file.deletionFile.orNull)
+          }.unzip
           new DataSplit.Builder()
             .withPartition(partition)
             .withBucket(bucket)
-            .withDataFiles(files.map(_.dataFileMeta).toList.asJava)
+            .withDataFiles(dataFiles.toList.asJava)
+            .withDataDeletionFiles(deletionFiles.toList.asJava)
             .rawConvertible(rawConvertible)
             .withBucketPath(pathFactory.bucketPath(partition, bucket).toString)
             .build()
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WithFileStoreTable.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDeletionVectors.scala
similarity index 55%
copy from 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WithFileStoreTable.scala
copy to 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDeletionVectors.scala
index 1d447281e..5f6c8c347 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WithFileStoreTable.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDeletionVectors.scala
@@ -18,14 +18,23 @@
 
 package org.apache.paimon.spark.commands
 
-import org.apache.paimon.table.FileStoreTable
-import org.apache.paimon.types.RowType
+import org.apache.paimon.utils.{FileStorePathFactory, SerializationUtils}
 
-private[spark] trait WithFileStoreTable {
-
-  def table: FileStoreTable
-
-  def withPrimaryKeys: Boolean = !table.primaryKeys().isEmpty
-
-  def rowType: RowType = table.rowType()
+/**
+ * This class will be used as Dataset's pattern type. So here use Array[Byte] 
instead of BinaryRow
+ * or DeletionVector.
+ */
+case class SparkDeletionVectors(
+    partitionAndBucket: String,
+    partition: Array[Byte],
+    bucket: Int,
+    dataFileAndDeletionVector: Seq[(String, Array[Byte])]
+) {
+  def relativePaths(fileStorePathFactory: FileStorePathFactory): Seq[String] = 
{
+    val prefix = fileStorePathFactory
+      
.relativePartitionAndBucketPath(SerializationUtils.deserializeBinaryRow(partition),
 bucket)
+      .toUri
+      .toString + "/"
+    dataFileAndDeletionVector.map(prefix + _._1)
+  }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WithFileStoreTable.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WithFileStoreTable.scala
index 1d447281e..5b2e704da 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WithFileStoreTable.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WithFileStoreTable.scala
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.spark.commands
 
+import org.apache.paimon.{CoreOptions, FileStore}
 import org.apache.paimon.table.FileStoreTable
 import org.apache.paimon.types.RowType
 
@@ -28,4 +29,10 @@ private[spark] trait WithFileStoreTable {
   def withPrimaryKeys: Boolean = !table.primaryKeys().isEmpty
 
   def rowType: RowType = table.rowType()
+
+  def coreOptions: CoreOptions = table.coreOptions()
+
+  def fileStore: FileStore[_] = table.store()
+
+  def deletionVectorsEnabled: Boolean = coreOptions.deletionVectorsEnabled()
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/schema/PaimonMetadataColumn.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/schema/PaimonMetadataColumn.scala
index 74ce7090d..a82454d98 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/schema/PaimonMetadataColumn.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/schema/PaimonMetadataColumn.scala
@@ -21,6 +21,7 @@ package org.apache.paimon.spark.schema
 import org.apache.paimon.spark.SparkTypeUtils
 import org.apache.paimon.types.DataField
 
+import org.apache.spark.sql.catalyst.expressions.AttributeReference
 import org.apache.spark.sql.connector.catalog.MetadataColumn
 import org.apache.spark.sql.types.{DataType, LongType, StringType}
 
@@ -30,6 +31,10 @@ case class PaimonMetadataColumn(id: Int, override val name: 
String, override val
   def toPaimonDataField: DataField = {
     new DataField(id, name, SparkTypeUtils.toPaimonType(dataType));
   }
+
+  def toAttribute: AttributeReference = {
+    AttributeReference(name, dataType)()
+  }
 }
 
 object PaimonMetadataColumn {
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala
index f45a22cae..02d898f97 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala
@@ -20,15 +20,139 @@ package org.apache.paimon.spark.sql
 
 import org.apache.paimon.data.BinaryRow
 import org.apache.paimon.deletionvectors.{DeletionVector, 
DeletionVectorsMaintainer}
+import org.apache.paimon.fs.Path
 import org.apache.paimon.spark.PaimonSparkTestBase
+import org.apache.paimon.table.FileStoreTable
 
 import org.apache.spark.sql.Row
 import org.junit.jupiter.api.Assertions
 
+import scala.collection.JavaConverters._
 import scala.util.Random
 
 class DeletionVectorTest extends PaimonSparkTestBase {
 
+  import testImplicits._
+
+  bucketModes.foreach {
+    bucket =>
+      test(
+        s"Paimon DeletionVector: delete for append non-partitioned table with 
bucket = $bucket") {
+        withTable("T") {
+          val bucketKey = if (bucket > 1) {
+            ", 'bucket-key' = 'id'"
+          } else {
+            ""
+          }
+          spark.sql(
+            s"""
+               |CREATE TABLE T (id INT, name STRING)
+               |TBLPROPERTIES ('deletion-vectors.enabled' = 'true', 'bucket' = 
'$bucket' $bucketKey)
+               |""".stripMargin)
+
+          val table = loadTable("T")
+          val dvMaintainerFactory =
+            new 
DeletionVectorsMaintainer.Factory(table.store().newIndexFileHandler())
+
+          spark.sql("INSERT INTO T VALUES (1, 'a'), (2, 'b')")
+          val deletionVectors1 = getLatestDeletionVectors(table, 
dvMaintainerFactory)
+          Assertions.assertEquals(0, deletionVectors1.size)
+
+          val cond1 = "id = 2"
+          val rowMetaInfo1 = getFilePathAndRowIndex(cond1)
+          spark.sql(s"DELETE FROM T WHERE $cond1")
+          checkAnswer(spark.sql(s"SELECT * from T ORDER BY id"), Row(1, "a") 
:: Nil)
+          val deletionVectors2 = getLatestDeletionVectors(table, 
dvMaintainerFactory)
+          Assertions.assertEquals(1, deletionVectors2.size)
+          deletionVectors2
+            .foreach {
+              case (filePath, dv) =>
+                rowMetaInfo1(filePath).foreach(index => 
Assertions.assertTrue(dv.isDeleted(index)))
+            }
+
+          spark.sql("INSERT INTO T VALUES (2, 'bb'), (3, 'c'), (4, 'd')")
+          checkAnswer(
+            spark.sql(s"SELECT * from T ORDER BY id"),
+            Row(1, "a") :: Row(2, "bb") :: Row(3, "c") :: Row(4, "d") :: Nil)
+          val deletionVectors3 = getLatestDeletionVectors(table, 
dvMaintainerFactory)
+          Assertions.assertTrue(deletionVectors2 == deletionVectors3)
+
+          val cond2 = "id % 2 = 1"
+          spark.sql(s"DELETE FROM T WHERE $cond2")
+          checkAnswer(spark.sql(s"SELECT * from T ORDER BY id"), Row(2, "bb") 
:: Row(4, "d") :: Nil)
+        }
+      }
+  }
+
+  bucketModes.foreach {
+    bucket =>
+      test(s"Paimon DeletionVector: delete for append partitioned table with 
bucket = $bucket") {
+        withTable("T") {
+          val bucketKey = if (bucket > 1) {
+            ", 'bucket-key' = 'id'"
+          } else {
+            ""
+          }
+          spark.sql(
+            s"""
+               |CREATE TABLE T (id INT, name STRING, pt STRING)
+               |PARTITIONED BY(pt)
+               |TBLPROPERTIES ('deletion-vectors.enabled' = 'true', 'bucket' = 
'$bucket' $bucketKey)
+               |""".stripMargin)
+
+          val table = loadTable("T")
+          val dvMaintainerFactory =
+            new 
DeletionVectorsMaintainer.Factory(table.store().newIndexFileHandler())
+
+          spark.sql(
+            "INSERT INTO T VALUES (1, 'a', '2024'), (2, 'b', '2024'), (3, 'c', 
'2025'), (4, 'd', '2025')")
+          val deletionVectors1 = getLatestDeletionVectors(table, 
dvMaintainerFactory)
+          Assertions.assertEquals(0, deletionVectors1.size)
+
+          val cond1 = "id = 2"
+          val rowMetaInfo1 = getFilePathAndRowIndex(cond1)
+          spark.sql(s"DELETE FROM T WHERE $cond1")
+          checkAnswer(
+            spark.sql(s"SELECT * from T ORDER BY id"),
+            Row(1, "a", "2024") :: Row(3, "c", "2025") :: Row(4, "d", "2025") 
:: Nil)
+          val deletionVectors2 =
+            getLatestDeletionVectors(
+              table,
+              dvMaintainerFactory,
+              Seq(BinaryRow.singleColumn("2024"), 
BinaryRow.singleColumn("2025")))
+          Assertions.assertEquals(1, deletionVectors2.size)
+          deletionVectors2
+            .foreach {
+              case (filePath, dv) =>
+                rowMetaInfo1(filePath).foreach(index => 
Assertions.assertTrue(dv.isDeleted(index)))
+            }
+
+          val cond2 = "id = 3"
+          val rowMetaInfo2 = rowMetaInfo1 ++ getFilePathAndRowIndex(cond2)
+          spark.sql(s"DELETE FROM T WHERE $cond2")
+          checkAnswer(
+            spark.sql(s"SELECT * from T ORDER BY id"),
+            Row(1, "a", "2024") :: Row(4, "d", "2025") :: Nil)
+          val deletionVectors3 =
+            getLatestDeletionVectors(
+              table,
+              dvMaintainerFactory,
+              Seq(BinaryRow.singleColumn("2024")))
+          Assertions.assertTrue(deletionVectors2 == deletionVectors3)
+          val deletionVectors4 =
+            getLatestDeletionVectors(
+              table,
+              dvMaintainerFactory,
+              Seq(BinaryRow.singleColumn("2024"), 
BinaryRow.singleColumn("2025")))
+          deletionVectors4
+            .foreach {
+              case (filePath, dv) =>
+                rowMetaInfo2(filePath).foreach(index => 
Assertions.assertTrue(dv.isDeleted(index)))
+            }
+        }
+      }
+  }
+
   test("Paimon deletionVector: deletion vector write verification") {
     withTable("T") {
       spark.sql(s"""
@@ -54,27 +178,20 @@ class DeletionVectorTest extends PaimonSparkTestBase {
       val dvMaintainerFactory =
         new 
DeletionVectorsMaintainer.Factory(table.store().newIndexFileHandler())
 
-      def restoreDeletionVectors(): java.util.Map[String, DeletionVector] = {
-        dvMaintainerFactory
-          .createOrRestore(table.snapshotManager().latestSnapshotId(), 
BinaryRow.EMPTY_ROW, 0)
-          .deletionVectors()
-      }
-
-      val deletionVectors1 = restoreDeletionVectors()
+      val deletionVectors1 = getLatestDeletionVectors(table, 
dvMaintainerFactory)
       // 1, 3 deleted, their row positions are 0, 2
-      Assertions.assertEquals(1, deletionVectors1.size())
+      Assertions.assertEquals(1, deletionVectors1.size)
       deletionVectors1
-        .entrySet()
-        .forEach(
-          e => {
-            Assertions.assertTrue(e.getValue.isDeleted(0))
-            Assertions.assertTrue(e.getValue.isDeleted(2))
-          })
+        .foreach {
+          case (_, dv) =>
+            Assertions.assertTrue(dv.isDeleted(0))
+            Assertions.assertTrue(dv.isDeleted(2))
+        }
 
       // Compact
       // f3 (1, 2, 3), no deletion
       spark.sql("CALL sys.compact('T')")
-      val deletionVectors2 = restoreDeletionVectors()
+      val deletionVectors2 = getLatestDeletionVectors(table, 
dvMaintainerFactory)
       // After compaction, deletionVectors should be empty
       Assertions.assertTrue(deletionVectors2.isEmpty)
 
@@ -86,15 +203,14 @@ class DeletionVectorTest extends PaimonSparkTestBase {
         spark.sql(s"SELECT * from T ORDER BY id"),
         Row(1, "a_new1") :: Row(2, "b_new2") :: Row(3, "c_new1") :: Nil)
 
-      val deletionVectors3 = restoreDeletionVectors()
+      val deletionVectors3 = getLatestDeletionVectors(table, 
dvMaintainerFactory)
       // 2 deleted, row positions is 1
-      Assertions.assertEquals(1, deletionVectors3.size())
+      Assertions.assertEquals(1, deletionVectors3.size)
       deletionVectors3
-        .entrySet()
-        .forEach(
-          e => {
-            Assertions.assertTrue(e.getValue.isDeleted(1))
-          })
+        .foreach {
+          case (_, dv) =>
+            Assertions.assertTrue(dv.isDeleted(1))
+        }
     }
   }
 
@@ -186,4 +302,36 @@ class DeletionVectorTest extends PaimonSparkTestBase {
       checkResult(dvTbl, resultTbl)
     }
   }
+
+  private def getPathName(path: String): String = {
+    new Path(path).getName
+  }
+
+  private def getLatestDeletionVectors(
+      table: FileStoreTable,
+      dvMaintainerFactory: DeletionVectorsMaintainer.Factory): Map[String, 
DeletionVector] = {
+    getLatestDeletionVectors(table, dvMaintainerFactory, 
Seq(BinaryRow.EMPTY_ROW))
+  }
+
+  private def getLatestDeletionVectors(
+      table: FileStoreTable,
+      dvMaintainerFactory: DeletionVectorsMaintainer.Factory,
+      partitions: Seq[BinaryRow]): Map[String, DeletionVector] = {
+    partitions.flatMap {
+      partition =>
+        dvMaintainerFactory
+          .createOrRestore(table.snapshotManager().latestSnapshotId(), 
partition)
+          .deletionVectors()
+          .asScala
+    }.toMap
+  }
+
+  private def getFilePathAndRowIndex(condition: String): Map[String, 
Array[Long]] = {
+    spark
+      .sql(s"SELECT __paimon_file_path, __paimon_row_index FROM T WHERE 
$condition ORDER BY id")
+      .as[(String, Long)]
+      .collect()
+      .groupBy(_._1)
+      .map(kv => (getPathName(kv._1), kv._2.map(_._2)))
+  }
 }


Reply via email to