This is an automated email from the ASF dual-hosted git repository.

loneylee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new a4b230bbbf [GLUTEN-9205][CH] Support deletion vector native write 
(#9248)
a4b230bbbf is described below

commit a4b230bbbfc34994d2ad838b1c15d7211168b3a7
Author: Shuai li <[email protected]>
AuthorDate: Wed Apr 9 11:18:13 2025 +0800

    [GLUTEN-9205][CH] Support deletion vector native write (#9248)
    
    * [GLUTEN-9205][CH] Support deletion vector native write
---
 .../gluten/vectorized/DeltaWriterJNIWrapper.java   |  44 ++
 .../gluten/sql/shims/delta32/Delta32Shims.scala    |   7 +
 .../commands/DMLWithDeletionVectorsHelper.scala    | 701 +++++++++++++++++++++
 .../execution/DeletionVectorWriteTransformer.scala | 175 +++++
 .../GlutenDeltaParquetDeletionVectorSuite.scala    | 118 +++-
 .../apache/gluten/component/CHDeltaComponent.scala |   7 +
 .../org/apache/gluten/sql/shims/DeltaShims.scala   |   6 +-
 .../AggregateFunctionDVRoaringBitmap.h             |   2 +-
 .../Delta/Bitmap/DeltaDVRoaringBitmapArray.cpp     |  16 +-
 .../Delta/Bitmap/DeltaDVRoaringBitmapArray.h       |   8 +-
 .../Storages/SubstraitSource/Delta/DeltaUtil.cpp   |  80 +++
 .../Storages/SubstraitSource/Delta/DeltaUtil.h     |  51 ++
 .../Storages/SubstraitSource/Delta/DeltaWriter.cpp | 252 ++++++++
 .../Storages/SubstraitSource/Delta/DeltaWriter.h   |  84 +++
 cpp-ch/local-engine/local_engine_jni.cpp           |  43 ++
 .../tests/gtest_clickhouse_roaring_bitmap.cpp      |   2 +-
 16 files changed, 1586 insertions(+), 10 deletions(-)

diff --git 
a/backends-clickhouse/src-delta-32/main/java/org/apache/gluten/vectorized/DeltaWriterJNIWrapper.java
 
b/backends-clickhouse/src-delta-32/main/java/org/apache/gluten/vectorized/DeltaWriterJNIWrapper.java
new file mode 100644
index 0000000000..ada976668c
--- /dev/null
+++ 
b/backends-clickhouse/src-delta-32/main/java/org/apache/gluten/vectorized/DeltaWriterJNIWrapper.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.vectorized;
+
+import org.apache.spark.sql.execution.DeletionVectorWriteTransformer;
+
+public class DeltaWriterJNIWrapper {
+
+    private DeltaWriterJNIWrapper() {
+        // utility class
+    }
+
+    public static native void registerNativeReference();
+
+    public static native long createDeletionVectorWriter(String tablePath, int 
prefix_length, long packingTargetSize);
+
+    public static native void deletionVectorWrite(long writer_address, long 
block_address);
+
+    public static native long deletionVectorWriteFinalize(long writer_address);
+
+    // call from native
+    public static String encodeUUID(String uuid, String randomPrefix) {
+        return DeletionVectorWriteTransformer.encodeUUID(uuid, randomPrefix);
+    }
+
+    // call from native
+    public static String decodeUUID(String encodedUuid) {
+        return DeletionVectorWriteTransformer.decodeUUID(encodedUuid);
+    }
+}
diff --git 
a/backends-clickhouse/src-delta-32/main/scala/org/apache/gluten/sql/shims/delta32/Delta32Shims.scala
 
b/backends-clickhouse/src-delta-32/main/scala/org/apache/gluten/sql/shims/delta32/Delta32Shims.scala
index 52da7b7883..0244bf3ffd 100644
--- 
a/backends-clickhouse/src-delta-32/main/scala/org/apache/gluten/sql/shims/delta32/Delta32Shims.scala
+++ 
b/backends-clickhouse/src-delta-32/main/scala/org/apache/gluten/sql/shims/delta32/Delta32Shims.scala
@@ -19,7 +19,10 @@ package org.apache.gluten.sql.shims.delta32
 import org.apache.gluten.execution.GlutenPlan
 import org.apache.gluten.extension.{DeltaExpressionExtensionTransformer, 
ExpressionExtensionTrait}
 import org.apache.gluten.sql.shims.DeltaShims
+import org.apache.gluten.vectorized.DeltaWriterJNIWrapper
 
+import org.apache.spark.SparkContext
+import org.apache.spark.api.plugin.PluginContext
 import org.apache.spark.sql.delta.DeltaParquetFileFormat
 import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor
 import org.apache.spark.sql.delta.util.JsonUtils
@@ -41,6 +44,10 @@ class Delta32Shims extends DeltaShims {
     DeltaOptimizedWriterTransformer.from(plan)
   }
 
+  override def onDriverStart(sc: SparkContext, pc: PluginContext): Unit = {
+    DeltaWriterJNIWrapper.registerNativeReference()
+  }
+
   override def registerExpressionExtension(): Unit = {
     
ExpressionExtensionTrait.registerExpressionExtension(DeltaExpressionExtensionTransformer())
   }
diff --git 
a/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/delta/commands/DMLWithDeletionVectorsHelper.scala
 
b/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/delta/commands/DMLWithDeletionVectorsHelper.scala
new file mode 100644
index 0000000000..889c28cf0c
--- /dev/null
+++ 
b/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/delta/commands/DMLWithDeletionVectorsHelper.scala
@@ -0,0 +1,701 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.delta.commands
+
+import org.apache.gluten.config.GlutenConfig
+
+import java.util.UUID
+import scala.collection.generic.Sizing
+import org.apache.spark.sql.catalyst.expressions.aggregation.BitmapAggregator
+import org.apache.spark.sql.delta.{DeltaLog, DeltaParquetFileFormat, 
OptimisticTransaction, Snapshot}
+import org.apache.spark.sql.delta.DeltaParquetFileFormat._
+import org.apache.spark.sql.delta.actions.{AddFile, DeletionVectorDescriptor, 
FileAction}
+import org.apache.spark.sql.delta.deletionvectors.{RoaringBitmapArray, 
RoaringBitmapArrayFormat, StoredBitmap}
+import org.apache.spark.sql.delta.files.{TahoeBatchFileIndex, TahoeFileIndex}
+import org.apache.spark.sql.delta.metering.DeltaLogging
+import org.apache.spark.sql.delta.sources.DeltaSQLConf
+import org.apache.spark.sql.delta.stats.StatsCollectionUtils
+import org.apache.spark.sql.delta.storage.dv.DeletionVectorStore
+import org.apache.spark.sql.delta.util.{BinPackingIterator, DeltaEncoder, 
JsonUtils, PathWithFileSystem, Utils => DeltaUtils}
+import org.apache.spark.sql.delta.util.DeltaFileOperations.absolutePath
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.spark.paths.SparkPath
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
+import org.apache.spark.sql.execution.DeletionVectorWriteTransformer
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.execution.datasources.FileFormat.{FILE_PATH, 
METADATA_NAME}
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.functions.{col, lit}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.{SerializableConfiguration, Utils => SparkUtils}
+
+/**
+ * Gluten overwrite Delta:
+ *
+ * This file is copied from Delta 3.2.1.
+ */
+
+/**
+ * Contains utility classes and method for performing DML operations with 
Deletion Vectors.
+ */
+object DMLWithDeletionVectorsHelper extends DeltaCommand {
+  val SUPPORTED_DML_COMMANDS: Seq[String] = Seq("DELETE", "UPDATE")
+
+  /**
+   * Creates a DataFrame that can be used to scan for rows matching the 
condition in the given
+   * files. Generally the given file list is a pruned file list using the 
stats based pruning.
+   */
+  def createTargetDfForScanningForMatches(
+      spark: SparkSession,
+      target: LogicalPlan,
+      fileIndex: TahoeFileIndex): DataFrame = {
+    Dataset.ofRows(spark, replaceFileIndex(spark, target, fileIndex))
+  }
+
+  /**
+   * Replace the file index in a logical plan and return the updated plan.
+   * It's a common pattern that, in Delta commands, we use data skipping to 
determine a subset of
+   * files that can be affected by the command, so we replace the whole-table 
file index in the
+   * original logical plan with a new index of potentially affected files, 
while everything else in
+   * the original plan, e.g., resolved references, remain unchanged.
+   *
+   * In addition we also request a metadata column and a row index column from 
the Scan to help
+   * generate the Deletion Vectors. When predicate pushdown is enabled, we 
only request the
+   * metadata column. This is because we can utilize _metadata.row_index 
instead of generating a
+   * custom one.
+   *
+   * @param spark the active spark session
+   * @param target the logical plan in which we replace the file index
+   * @param fileIndex the new file index
+   */
+  private def replaceFileIndex(
+      spark: SparkSession,
+      target: LogicalPlan,
+      fileIndex: TahoeFileIndex): LogicalPlan = {
+    val useMetadataRowIndex =
+      
spark.sessionState.conf.getConf(DeltaSQLConf.DELETION_VECTORS_USE_METADATA_ROW_INDEX)
+    // This is only used when predicate pushdown is disabled.
+    val rowIndexCol = AttributeReference(ROW_INDEX_COLUMN_NAME, 
ROW_INDEX_STRUCT_FIELD.dataType)()
+
+    var fileMetadataCol: AttributeReference = null
+
+    val newTarget = target.transformUp {
+      case l @ LogicalRelation(
+        hfsr @ HadoopFsRelation(_, _, _, _, format: DeltaParquetFileFormat, 
_), _, _, _) =>
+        fileMetadataCol = format.createFileMetadataCol()
+        // Take the existing schema and add additional metadata columns
+        if (useMetadataRowIndex) {
+          l.copy(
+            relation = hfsr.copy(location = fileIndex)(hfsr.sparkSession),
+            output = l.output :+ fileMetadataCol)
+        } else {
+          val newDataSchema =
+            StructType(hfsr.dataSchema).add(ROW_INDEX_STRUCT_FIELD)
+          val finalOutput = l.output ++ Seq(rowIndexCol, fileMetadataCol)
+          // Disable splitting and filter pushdown in order to generate the 
row-indexes.
+          val newFormat = format.copy(optimizationsEnabled = false)
+          val newBaseRelation = hfsr.copy(
+            location = fileIndex,
+            dataSchema = newDataSchema,
+            fileFormat = newFormat)(hfsr.sparkSession)
+
+          l.copy(relation = newBaseRelation, output = finalOutput)
+        }
+      case p @ Project(projectList, _) =>
+        if (fileMetadataCol == null) {
+          throw new IllegalStateException("File metadata column is not yet 
created.")
+        }
+        val rowIndexColOpt = if (useMetadataRowIndex) None else 
Some(rowIndexCol)
+        val additionalColumns = Seq(fileMetadataCol) ++ rowIndexColOpt
+        p.copy(projectList = projectList ++ additionalColumns)
+    }
+    newTarget
+  }
+
+  /**
+   * Find the target table files that contain rows that satisfy the condition 
and a DV attached
+   * to each file that indicates a the rows marked as deleted from the file
+   */
+  def findTouchedFiles(
+      sparkSession: SparkSession,
+      txn: OptimisticTransaction,
+      hasDVsEnabled: Boolean,
+      deltaLog: DeltaLog,
+      targetDf: DataFrame,
+      fileIndex: TahoeFileIndex,
+      condition: Expression,
+      opName: String): Seq[TouchedFileWithDV] = {
+    require(
+      SUPPORTED_DML_COMMANDS.contains(opName),
+      s"Expecting opName to be one of ${SUPPORTED_DML_COMMANDS.mkString(", 
")}, " +
+        s"but got '$opName'.")
+
+    recordDeltaOperation(deltaLog, opType = s"$opName.findTouchedFiles") {
+      val candidateFiles = fileIndex match {
+        case f: TahoeBatchFileIndex => f.addFiles
+        case _ => throw new IllegalArgumentException("Unexpected file index 
found!")
+      }
+
+      val matchedRowIndexSets =
+        
DeletionVectorBitmapGenerator.buildRowIndexSetsForFilesMatchingCondition(
+          sparkSession,
+          txn,
+          hasDVsEnabled,
+          targetDf,
+          candidateFiles,
+          condition)
+
+      val nameToAddFileMap = generateCandidateFileMap(txn.deltaLog.dataPath, 
candidateFiles)
+      findFilesWithMatchingRows(txn, nameToAddFileMap, matchedRowIndexSets)
+    }
+  }
+
+  /**
+   * Finds the files in nameToAddFileMap in which rows were deleted by 
checking the row index set.
+   */
+  def findFilesWithMatchingRows(
+      txn: OptimisticTransaction,
+      nameToAddFileMap: Map[String, AddFile],
+      matchedFileRowIndexSets: Seq[DeletionVectorResult]): 
Seq[TouchedFileWithDV] = {
+    // Get the AddFiles using the touched file names and group them together 
with other
+    // information we need for later phases.
+    val dataPath = txn.deltaLog.dataPath
+    val touchedFilesWithMatchedRowIndices = matchedFileRowIndexSets.map { 
fileRowIndex =>
+      val filePath = fileRowIndex.filePath
+      val addFile = getTouchedFile(dataPath, filePath, nameToAddFileMap)
+      TouchedFileWithDV(
+        filePath,
+        addFile,
+        fileRowIndex.deletionVector,
+        fileRowIndex.matchedRowCount)
+    }
+
+    logTrace("findTouchedFiles: matched files:\n\t" +
+      
s"${touchedFilesWithMatchedRowIndices.map(_.inputFilePath).mkString("\n\t")}")
+
+    touchedFilesWithMatchedRowIndices.filterNot(_.isUnchanged)
+  }
+
+  def processUnmodifiedData(
+      spark: SparkSession,
+      touchedFiles: Seq[TouchedFileWithDV],
+      snapshot: Snapshot): (Seq[FileAction], Map[String, Long]) = {
+    val numModifiedRows: Long = touchedFiles.map(_.numberOfModifiedRows).sum
+    val numRemovedFiles: Long = touchedFiles.count(_.isFullyReplaced())
+
+    val (fullyRemovedFiles, notFullyRemovedFiles) = 
touchedFiles.partition(_.isFullyReplaced())
+
+    val timestamp = System.currentTimeMillis()
+    val fullyRemoved = 
fullyRemovedFiles.map(_.fileLogEntry.removeWithTimestamp(timestamp))
+
+    val dvUpdates = notFullyRemovedFiles.map { fileWithDVInfo =>
+      fileWithDVInfo.fileLogEntry.removeRows(
+        deletionVector = fileWithDVInfo.newDeletionVector,
+        updateStats = false
+      )}
+    val (dvAddFiles, dvRemoveFiles) = dvUpdates.unzip
+    val dvAddFilesWithStats = getActionsWithStats(spark, dvAddFiles, snapshot)
+
+    var (numDeletionVectorsAdded, numDeletionVectorsRemoved, 
numDeletionVectorsUpdated) =
+      dvUpdates.foldLeft((0L, 0L, 0L)) { case ((added, removed, updated), 
(addFile, removeFile)) =>
+        (Option(addFile.deletionVector), Option(removeFile.deletionVector)) 
match {
+          case (Some(_), Some(_)) => (added, removed, updated + 1)
+          case (None, Some(_)) => (added, removed + 1, updated)
+          case (Some(_), None) => (added + 1, removed, updated)
+          case _ => (added, removed, updated)
+        }
+      }
+    numDeletionVectorsRemoved += fullyRemoved.count(_.deletionVector != null)
+    val metricMap = Map(
+      "numModifiedRows" -> numModifiedRows,
+      "numRemovedFiles" -> numRemovedFiles,
+      "numDeletionVectorsAdded" -> numDeletionVectorsAdded,
+      "numDeletionVectorsRemoved" -> numDeletionVectorsRemoved,
+      "numDeletionVectorsUpdated" -> numDeletionVectorsUpdated)
+    (fullyRemoved ++ dvAddFilesWithStats ++ dvRemoveFiles, metricMap)
+  }
+
+  /** Fetch stats for `addFiles`. */
+  private def getActionsWithStats(
+      spark: SparkSession,
+      addFilesWithNewDvs: Seq[AddFile],
+      snapshot: Snapshot): Seq[AddFile] = {
+    import org.apache.spark.sql.delta.implicits._
+
+    if (addFilesWithNewDvs.isEmpty) return Seq.empty
+
+    val selectionPathAndStatsCols = Seq(col("path"), col("stats"))
+    val addFilesWithNewDvsDf = addFilesWithNewDvs.toDF(spark)
+
+    // These files originate from snapshot.filesForScan which resets column 
statistics.
+    // Since these object don't carry stats and tags, if we were to use them 
as result actions of
+    // the operation directly, we'd effectively be removing all stats and 
tags. To resolve this
+    // we join the list of files with DVs with the log (allFiles) to retrieve 
statistics. This is
+    // expected to have better performance than supporting full stats retrieval
+    // in snapshot.filesForScan because it only affects a subset of the 
scanned files.
+
+    // Find the current metadata with stats for all files with new DV
+    val addFileWithStatsDf = snapshot.withStats
+      .join(addFilesWithNewDvsDf.select("path"), "path")
+
+    // Update the existing stats to set the tightBounds to false and also set 
the appropriate
+    // null count. We want to set the bounds before the AddFile has DV 
descriptor attached.
+    // Attaching the DV descriptor here, causes wrong logical records 
computation in
+    // `updateStatsToWideBounds`.
+    val statsColName = snapshot.getBaseStatsColumnName
+    val addFilesWithWideBoundsDf = snapshot
+      .updateStatsToWideBounds(addFileWithStatsDf, statsColName)
+
+    val (filesWithNoStats, filesWithExistingStats) = {
+      // numRecords is the only stat we really have to guarantee.
+      // If the others are missing, we do not need to fetch them.
+      addFilesWithWideBoundsDf.as[AddFile].collect().toSeq
+        .partition(_.numPhysicalRecords.isEmpty)
+    }
+
+    // If we encounter files with no stats we fetch the stats from the parquet 
footer.
+    // Files with persistent DVs *must* have (at least numRecords) stats 
according to the
+    // Delta spec.
+    val filesWithFetchedStats =
+      if (filesWithNoStats.nonEmpty) {
+        StatsCollectionUtils.computeStats(spark,
+          conf = snapshot.deltaLog.newDeltaHadoopConf(),
+          dataPath = snapshot.deltaLog.dataPath,
+          addFiles = filesWithNoStats.toDS(spark),
+          numFilesOpt = Some(filesWithNoStats.size),
+          columnMappingMode = snapshot.metadata.columnMappingMode,
+          dataSchema = snapshot.dataSchema,
+          statsSchema = snapshot.statsSchema,
+          setBoundsToWide = true)
+          .collect()
+          .toSeq
+      } else {
+        Seq.empty
+      }
+
+    val allAddFilesWithUpdatedStats =
+      (filesWithExistingStats ++ filesWithFetchedStats).toSeq.toDF(spark)
+
+    // Now join the allAddFilesWithUpdatedStats with addFilesWithNewDvs
+    // so that the updated stats are joined with the new DV info
+    addFilesWithNewDvsDf.drop("stats")
+      .join(
+        allAddFilesWithUpdatedStats.select(selectionPathAndStatsCols: _*), 
"path")
+      .as[AddFile]
+      .collect()
+      .toSeq
+  }
+}
+
+object DeletionVectorBitmapGenerator {
+  final val FILE_NAME_COL = "filePath"
+  final val FILE_DV_ID_COL = "deletionVectorId"
+  final val ROW_INDEX_COL = "rowIndexCol"
+  final val DELETED_ROW_INDEX_BITMAP = "deletedRowIndexSet"
+  final val DELETED_ROW_INDEX_COUNT = "deletedRowIndexCount"
+  final val MAX_ROW_INDEX_COL = "maxRowIndexCol"
+
+  private class DeletionVectorSet(
+    spark: SparkSession,
+    target: DataFrame,
+    targetDeltaLog: DeltaLog,
+    deltaTxn: OptimisticTransaction) {
+
+    case object CardinalityAndBitmapStruct {
+      val name: String = "CardinalityAndBitmapStruct"
+      def cardinality: String = s"$name.cardinality"
+      def bitmap: String = s"$name.bitmap"
+    }
+
+    def computeResult(): Seq[DeletionVectorResult] = {
+      val aggregated = target
+        .groupBy(col(FILE_NAME_COL), col(FILE_DV_ID_COL))
+        .agg(aggColumns.head, aggColumns.tail: _*)
+        .select(outputColumns: _*)
+
+      // --- modified start
+      if (spark.conf
+        .get(GlutenConfig.GLUTEN_ENABLED.key, 
GlutenConfig.GLUTEN_ENABLED.defaultValueString)
+        .toBoolean) {
+        DeletionVectorWriteTransformer.replace(aggregated, 
targetDeltaLog.dataPath, deltaTxn, spark)
+      } else {
+        import DeletionVectorResult.encoder
+        val rowIndexData = aggregated.as[DeletionVectorData]
+        val storedResults = rowIndexData.mapPartitions(bitmapStorageMapper())
+        storedResults.as[DeletionVectorResult].collect()
+      }
+      // --- modified end
+    }
+
+    protected def aggColumns: Seq[Column] = {
+      
Seq(createBitmapSetAggregator(col(ROW_INDEX_COL)).as(CardinalityAndBitmapStruct.name))
+    }
+
+    /** Create a bitmap set aggregator over the given column */
+    private def createBitmapSetAggregator(indexColumn: Column): Column = {
+      val func = new BitmapAggregator(indexColumn.expr, 
RoaringBitmapArrayFormat.Portable)
+      new Column(func.toAggregateExpression(isDistinct = false))
+    }
+
+    protected def outputColumns: Seq[Column] =
+      Seq(
+        col(FILE_NAME_COL),
+        col(FILE_DV_ID_COL),
+        col(CardinalityAndBitmapStruct.bitmap).as(DELETED_ROW_INDEX_BITMAP),
+        col(CardinalityAndBitmapStruct.cardinality).as(DELETED_ROW_INDEX_COUNT)
+      )
+
+    protected def bitmapStorageMapper()
+      : Iterator[DeletionVectorData] => Iterator[DeletionVectorResult] = {
+      val prefixLen = DeltaUtils.getRandomPrefixLength(deltaTxn.metadata)
+      DeletionVectorWriter.createMapperToStoreDeletionVectors(
+        spark,
+        targetDeltaLog.newDeltaHadoopConf(),
+        targetDeltaLog.dataPath,
+        prefixLen)
+    }
+  }
+
+  /**
+   * Build bitmap compressed sets of row indices for each file in [[target]] 
using
+   * [[ROW_INDEX_COL]].
+   * Write those sets out to temporary files and collect the file names,
+   * together with some encoded metadata about the contents.
+   *
+   * @param target  DataFrame with expected schema [[FILE_NAME_COL]], 
[[ROW_INDEX_COL]],
+   */
+  def buildDeletionVectors(
+      spark: SparkSession,
+      target: DataFrame,
+      targetDeltaLog: DeltaLog,
+      deltaTxn: OptimisticTransaction): Seq[DeletionVectorResult] = {
+    val rowIndexSet = new DeletionVectorSet(spark, target, targetDeltaLog, 
deltaTxn)
+    rowIndexSet.computeResult()
+  }
+
+  def buildRowIndexSetsForFilesMatchingCondition(
+      sparkSession: SparkSession,
+      txn: OptimisticTransaction,
+      tableHasDVs: Boolean,
+      targetDf: DataFrame,
+      candidateFiles: Seq[AddFile],
+      condition: Expression,
+      fileNameColumnOpt: Option[Column] = None,
+      rowIndexColumnOpt: Option[Column] = None): Seq[DeletionVectorResult] = {
+    val useMetadataRowIndexConf = 
DeltaSQLConf.DELETION_VECTORS_USE_METADATA_ROW_INDEX
+    val useMetadataRowIndex = 
sparkSession.sessionState.conf.getConf(useMetadataRowIndexConf)
+    val fileNameColumn = 
fileNameColumnOpt.getOrElse(col(s"${METADATA_NAME}.${FILE_PATH}"))
+    val rowIndexColumn = if (useMetadataRowIndex) {
+      
rowIndexColumnOpt.getOrElse(col(s"${METADATA_NAME}.${ParquetFileFormat.ROW_INDEX}"))
+    } else {
+      rowIndexColumnOpt.getOrElse(col(ROW_INDEX_COLUMN_NAME))
+    }
+    val matchedRowsDf = targetDf
+      .withColumn(FILE_NAME_COL, fileNameColumn)
+      // Filter after getting input file name as the filter might introduce a 
join and we
+      // cannot get input file name on join's output.
+      .filter(new Column(condition))
+      .withColumn(ROW_INDEX_COL, rowIndexColumn)
+
+    val df = if (tableHasDVs) {
+      // When the table already has DVs, join the `matchedRowDf` above to 
attach for each matched
+      // file its existing DeletionVectorDescriptor
+      val basePath = txn.deltaLog.dataPath.toString
+      val filePathToDV = candidateFiles.map { add =>
+        val serializedDV = Option(add.deletionVector).map(dvd => 
JsonUtils.toJson(dvd))
+        // Paths in the metadata column are canonicalized. Thus we must 
canonicalize the DV path.
+        FileToDvDescriptor(
+          SparkPath.fromPath(absolutePath(basePath, add.path)).urlEncoded,
+          serializedDV)
+      }
+      val filePathToDVDf = sparkSession.createDataset(filePathToDV)
+
+      val joinExpr = filePathToDVDf("path") === matchedRowsDf(FILE_NAME_COL)
+      // Perform leftOuter join to make sure we do not eliminate any rows 
because of path
+      // encoding issues. If there is such an issue we will detect it during 
the aggregation
+      // of the bitmaps.
+      val joinedDf = matchedRowsDf.join(filePathToDVDf, joinExpr, "leftOuter")
+        .drop(FILE_NAME_COL)
+        .withColumnRenamed("path", FILE_NAME_COL)
+      joinedDf
+    } else {
+      // When the table has no DVs, just add a column to indicate that the 
existing dv is null
+      matchedRowsDf.withColumn(FILE_DV_ID_COL, lit(null))
+    }
+
+    DeletionVectorBitmapGenerator.buildDeletionVectors(sparkSession, df, 
txn.deltaLog, txn)
+  }
+}
+
+/**
+ * Holds a mapping from a file path (url-encoded) to an (optional) serialized 
Deletion Vector
+ * descriptor.
+ */
+case class FileToDvDescriptor(path: String, deletionVectorId: Option[String])
+
+object FileToDvDescriptor {
+  private lazy val _encoder = new DeltaEncoder[FileToDvDescriptor]
+  implicit def encoder: Encoder[FileToDvDescriptor] = _encoder.get
+}
+
+/**
+ * Row containing the file path and its new deletion vector bitmap in memory
+ *
+ * @param filePath             Absolute path of the data file this DV result 
is generated for.
+ * @param deletionVectorId     Existing [[DeletionVectorDescriptor]] 
serialized in JSON format.
+ *                             This info is used to load the existing DV with 
the new DV.
+ * @param deletedRowIndexSet   In-memory Deletion vector bitmap generated 
containing the newly
+ *                             deleted row indexes from data file.
+ * @param deletedRowIndexCount Count of rows marked as deleted using the 
[[deletedRowIndexSet]].
+ */
+case class DeletionVectorData(
+    filePath: String,
+    deletionVectorId: Option[String],
+    deletedRowIndexSet: Array[Byte],
+    deletedRowIndexCount: Long) extends Sizing {
+
+  /** The size of the bitmaps to use in [[BinPackingIterator]]. */
+  override def size: Int = deletedRowIndexSet.length
+}
+
+object DeletionVectorData {
+  private lazy val _encoder = new DeltaEncoder[DeletionVectorData]
+  implicit def encoder: Encoder[DeletionVectorData] = _encoder.get
+
+  def apply(filePath: String, rowIndexSet: Array[Byte], rowIndexCount: Long): 
DeletionVectorData = {
+    DeletionVectorData(
+      filePath = filePath,
+      deletionVectorId = None,
+      deletedRowIndexSet = rowIndexSet,
+      deletedRowIndexCount = rowIndexCount)
+  }
+}
+
+/** Final output for each file containing the file path, 
DeletionVectorDescriptor and how many
+ * rows are marked as deleted in this file as part of the this operation 
(doesn't include rows that
+ * are already marked as deleted).
+ *
+ * @param filePath        Absolute path of the data file this DV result is 
generated for.
+ * @param deletionVector  Deletion vector generated containing the newly 
deleted row indices from
+ *                        data file.
+ * @param matchedRowCount Number of rows marked as deleted using the 
[[deletionVector]].
+ */
+case class DeletionVectorResult(
+    filePath: String,
+    deletionVector: DeletionVectorDescriptor,
+    matchedRowCount: Long) {
+}
+
+object DeletionVectorResult {
+  private lazy val _encoder = new DeltaEncoder[DeletionVectorResult]
+  implicit def encoder: Encoder[DeletionVectorResult] = _encoder.get
+
+  def fromDeletionVectorData(
+      data: DeletionVectorData,
+      deletionVector: DeletionVectorDescriptor): DeletionVectorResult = {
+    DeletionVectorResult(
+      filePath = data.filePath,
+      deletionVector = deletionVector,
+      matchedRowCount = data.deletedRowIndexCount)
+  }
+}
+
+case class TouchedFileWithDV(
+    inputFilePath: String,
+    fileLogEntry: AddFile,
+    newDeletionVector: DeletionVectorDescriptor,
+    deletedRows: Long) {
+  /**
+   * Checks the *sufficient* condition for a file being fully replaced by the 
current operation.
+   * (That is, all rows are either being updated or deleted.)
+   */
+  def isFullyReplaced(): Boolean = {
+    fileLogEntry.numLogicalRecords match {
+      case Some(numRecords) => numRecords == numberOfModifiedRows
+      case None => false // must make defensive assumption if no statistics 
are available
+    }
+  }
+
+  /**
+   * Checks if the file is unchanged by the current operation.
+   * (That is no row has been updated or deleted.)
+   */
+  def isUnchanged: Boolean = {
+    // If the bitmap is empty then no row would be removed during the rewrite,
+    // thus the file is unchanged.
+    numberOfModifiedRows == 0
+  }
+
+  /**
+   * The number of rows that are modified in this file.
+   */
+  def numberOfModifiedRows: Long = newDeletionVector.cardinality - 
fileLogEntry.numDeletedRecords
+}
+
+/**
+ * Utility methods to write the deletion vector to storage. If a particular 
file already
+ * has an existing DV, it will be merged with the new deletion vector and 
written to storage.
+ */
+object DeletionVectorWriter extends DeltaLogging {
+  /**
+   * The context for [[createDeletionVectorMapper]] callback functions. 
Contains the DV writer that
+   * is used by callback functions to write the new DVs.
+   */
+  case class DeletionVectorMapperContext(
+      dvStore: DeletionVectorStore,
+      writer: DeletionVectorStore.Writer,
+      tablePath: Path,
+      fileId: UUID,
+      prefix: String)
+
+  /**
+   * Prepare a mapper function for storing deletion vectors.
+   *
+   * For each DeletionVector the writer will create a 
[[DeletionVectorMapperContext]] that contains
+   * a DV writer that is used by to write the DV into a file.
+   *
+   * The result can be used with 
[[org.apache.spark.sql.Dataset.mapPartitions()]] and must thus be
+   * serialized.
+   */
+  def createDeletionVectorMapper[InputT <: Sizing, OutputT](
+      sparkSession: SparkSession,
+      hadoopConf: Configuration,
+      table: Path,
+      prefixLength: Int)
+      (callbackFn: (DeletionVectorMapperContext, InputT) => OutputT)
+    : Iterator[InputT] => Iterator[OutputT] = {
+    val broadcastHadoopConf = sparkSession.sparkContext.broadcast(
+      new SerializableConfiguration(hadoopConf))
+    // hadoop.fs.Path is not Serializable, so close over the String 
representation instead
+    val tablePathString = DeletionVectorStore.pathToEscapedString(table)
+    val packingTargetSize =
+      sparkSession.conf.get(DeltaSQLConf.DELETION_VECTOR_PACKING_TARGET_SIZE)
+
+    // This is the (partition) mapper function we are returning
+    (rowIterator: Iterator[InputT]) => {
+      val dvStore = 
DeletionVectorStore.createInstance(broadcastHadoopConf.value.value)
+      val tablePath = DeletionVectorStore.escapedStringToPath(tablePathString)
+      val tablePathWithFS = dvStore.pathWithFileSystem(tablePath)
+
+      val perBinFunction: Seq[InputT] => Seq[OutputT] = (rows: Seq[InputT]) => 
{
+        val prefix = DeltaUtils.getRandomPrefix(prefixLength)
+        val (writer, fileId) = createWriter(dvStore, tablePathWithFS, prefix)
+        val ctx = DeletionVectorMapperContext(
+          dvStore,
+          writer,
+          tablePath,
+          fileId,
+          prefix)
+        val result = SparkUtils.tryWithResource(writer) { writer =>
+          rows.map(r => callbackFn(ctx, r))
+        }
+        result
+      }
+
+      val binPackedRowIterator = new BinPackingIterator(rowIterator, 
packingTargetSize)
+      binPackedRowIterator.flatMap(perBinFunction)
+    }
+  }
+
+  /**
+   * Creates a writer for writing multiple DVs in the same file.
+   *
+   * Returns the writer and the UUID of the new file.
+   */
+  def createWriter(
+      dvStore: DeletionVectorStore,
+      tablePath: PathWithFileSystem,
+      prefix: String = ""): (DeletionVectorStore.Writer, UUID) = {
+    val fileId = UUID.randomUUID()
+    val writer = 
dvStore.createWriter(dvStore.generateFileNameInTable(tablePath, fileId, prefix))
+    (writer, fileId)
+  }
+
+  /** Store the `bitmapData` on cloud storage. */
+  def storeSerializedBitmap(
+      ctx: DeletionVectorMapperContext,
+      bitmapData: Array[Byte],
+      cardinality: Long): DeletionVectorDescriptor = {
+    if (cardinality == 0L) {
+      DeletionVectorDescriptor.EMPTY
+    } else {
+      val dvRange = ctx.writer.write(bitmapData)
+      DeletionVectorDescriptor.onDiskWithRelativePath(
+        id = ctx.fileId,
+        randomPrefix = ctx.prefix,
+        sizeInBytes = bitmapData.length,
+        cardinality = cardinality,
+        offset = Some(dvRange.offset))
+    }
+  }
+
+  /**
+   * Prepares a mapper function that can be used by DML commands to store the 
Deletion Vectors
+   * that are in described in [[DeletionVectorData]] and return their 
descriptors
+   * [[DeletionVectorResult]].
+   */
+  def createMapperToStoreDeletionVectors(
+      sparkSession: SparkSession,
+      hadoopConf: Configuration,
+      table: Path,
+      prefixLength: Int): Iterator[DeletionVectorData] => 
Iterator[DeletionVectorResult] =
+    createDeletionVectorMapper(sparkSession, hadoopConf, table, prefixLength) {
+      (ctx, row) => storeBitmapAndGenerateResult(ctx, row)
+    }
+
+  /**
+   * Helper to generate and store the deletion vector bitmap. The deletion 
vector is merged with
+   * the file's already existing deletion vector before being stored.
+   */
+  def storeBitmapAndGenerateResult(ctx: DeletionVectorMapperContext, row: 
DeletionVectorData)
+    : DeletionVectorResult = {
+    // If a group with null path exists it means there was an issue while 
joining with the log to
+    // fetch the DeletionVectorDescriptors.
+    assert(row.filePath != null,
+      s"""
+         |Encountered a non matched file path.
+         |It is likely that _metadata.file_path is not encoded by Spark as 
expected.
+         |""".stripMargin)
+
+    val fileDvDescriptor = 
row.deletionVectorId.map(DeletionVectorDescriptor.fromJson(_))
+    val finalDvDescriptor = fileDvDescriptor match {
+      case Some(existingDvDescriptor) if row.deletedRowIndexCount > 0 =>
+        // Load the existing bit map
+        val existingBitmap =
+          StoredBitmap.create(existingDvDescriptor, 
ctx.tablePath).load(ctx.dvStore)
+        val newBitmap = RoaringBitmapArray.readFrom(row.deletedRowIndexSet)
+
+        // Merge both the existing and new bitmaps into one, and finally 
persist on disk
+        existingBitmap.merge(newBitmap)
+        storeSerializedBitmap(
+          ctx,
+          
existingBitmap.serializeAsByteArray(RoaringBitmapArrayFormat.Portable),
+          existingBitmap.cardinality)
+      case Some(existingDvDescriptor) =>
+        existingDvDescriptor // This is already stored.
+      case None =>
+        // Persist the new bitmap
+        storeSerializedBitmap(ctx, row.deletedRowIndexSet, 
row.deletedRowIndexCount)
+    }
+    DeletionVectorResult.fromDeletionVectorData(row, deletionVector = 
finalDvDescriptor)
+  }
+}
+
diff --git 
a/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/execution/DeletionVectorWriteTransformer.scala
 
b/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/execution/DeletionVectorWriteTransformer.scala
new file mode 100644
index 0000000000..2715eff63e
--- /dev/null
+++ 
b/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/execution/DeletionVectorWriteTransformer.scala
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution
+
+import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.execution.ValidatablePlan
+import org.apache.gluten.extension.columnar.transition.Convention
+import org.apache.gluten.vectorized.{CHNativeBlock, DeltaWriterJNIWrapper}
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference}
+import org.apache.spark.sql.delta.OptimisticTransaction
+import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor
+import org.apache.spark.sql.delta.commands.DeletionVectorResult
+import org.apache.spark.sql.delta.sources.DeltaSQLConf
+import org.apache.spark.sql.delta.storage.dv.DeletionVectorStore
+import org.apache.spark.sql.delta.util.{Codec, Utils => DeltaUtils}
+import org.apache.spark.sql.execution.datasources.CallTransformer
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.vectorized.ColumnarBatch
+import org.apache.spark.util.Utils
+
+import org.apache.hadoop.fs.Path
+
+import java.util.UUID
+import java.util.concurrent.atomic.AtomicLong
+
+case class DeletionVectorWriteTransformer(
+    child: SparkPlan,
+    table: Path,
+    deltaTxn: OptimisticTransaction)
+  extends UnaryExecNode
+  with ValidatablePlan {
+  override def output: Seq[Attribute] = Seq(
+    AttributeReference("filePath", StringType, nullable = false)(),
+    AttributeReference(
+      "deletionVector",
+      DeletionVectorWriteTransformer.deletionVectorType,
+      nullable = false
+    )(),
+    AttributeReference("matchedRowCount", LongType, nullable = false)()
+  )
+
+  override protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
+    assert(child.supportsColumnar)
+    val prefixLen = DeltaUtils.getRandomPrefixLength(deltaTxn.metadata)
+    val tablePathString = DeletionVectorStore.pathToEscapedString(table)
+    val packingTargetSize =
+      session.conf.get(DeltaSQLConf.DELETION_VECTOR_PACKING_TARGET_SIZE)
+
+    child.executeColumnar().mapPartitions {
+      blockIterator =>
+        val res = new Iterator[ColumnarBatch] {
+          var writer: Long = 0
+          override def hasNext: Boolean = {
+            blockIterator.hasNext && writer == 0
+          }
+
+          override def next(): ColumnarBatch = {
+            writer = DeltaWriterJNIWrapper
+              .createDeletionVectorWriter(tablePathString, prefixLen, 
packingTargetSize)
+            while (blockIterator.hasNext) {
+              val n = blockIterator.next()
+              val block_address =
+                CHNativeBlock.fromColumnarBatch(n).blockAddress()
+              DeltaWriterJNIWrapper.deletionVectorWrite(writer, block_address)
+            }
+
+            val address = 
DeltaWriterJNIWrapper.deletionVectorWriteFinalize(writer)
+            new CHNativeBlock(address).toColumnarBatch
+          }
+        }
+        res
+    }
+  }
+
+  override protected def withNewChildInternal(newChild: SparkPlan): 
DeletionVectorWriteTransformer =
+    copy(child = newChild)
+
+  override def batchType(): Convention.BatchType = 
BackendsApiManager.getSettings.primaryBatchType
+
+  override def rowType0(): Convention.RowType = Convention.RowType.None
+
+  override protected def doExecute(): RDD[InternalRow] = throw new 
UnsupportedOperationException()
+}
+
+object DeletionVectorWriteTransformer {
+  val COUNTER = new AtomicLong(0)
+
+  private val deletionVectorType: StructType = StructType.apply(
+    Seq(
+      StructField.apply("storageType", StringType, nullable = false),
+      StructField.apply("pathOrInlineDv", StringType, nullable = false),
+      StructField.apply("offset", IntegerType, nullable = true),
+      StructField.apply("sizeInBytes", IntegerType, nullable = false),
+      StructField.apply("cardinality", LongType, nullable = false),
+      StructField.apply("maxRowIndex", LongType, nullable = true)
+    ))
+
+  def encodeUUID(uuid: String, randomPrefix: String): String = {
+    val uuidData = Codec.Base85Codec.encodeUUID(UUID.fromString(uuid))
+    // This should always be true and we are relying on it for separating out 
the
+    // prefix again later without having to spend an extra character as a 
separator.
+    assert(uuidData.length == 20)
+    // uuidData
+    s"$randomPrefix$uuidData"
+  }
+
+  def decodeUUID(encodedUuid: String): String = {
+    Codec.Base85Codec.decodeUUID(encodedUuid).toString
+  }
+
+  def replace(
+      aggregated: DataFrame,
+      tablePath: Path,
+      deltaTxn: OptimisticTransaction,
+      spark: SparkSession): Seq[DeletionVectorResult] = {
+    if (Utils.isTesting) {
+      COUNTER.incrementAndGet()
+    }
+
+    val queryExecution = aggregated.queryExecution
+    val new_e = DeletionVectorWriteTransformer(queryExecution.sparkPlan, 
tablePath, deltaTxn)
+
+    val result = CallTransformer(spark, new_e).executedPlan.executeCollect()
+
+    def internalRowToDeletionVectorResult(row: InternalRow): 
DeletionVectorResult = {
+      val filePath = row.getString(0)
+      val deletionVector = row.getStruct(1, 6)
+      val matchedRowCount = row.getLong(2)
+      val offset = if (deletionVector.isNullAt(2)) {
+        Option.empty
+      } else {
+        Some(deletionVector.getInt(2))
+      }
+
+      val maxRowIndex = if (deletionVector.isNullAt(5)) {
+        Option.empty
+      } else {
+        Some(deletionVector.getLong(5))
+      }
+
+      DeletionVectorResult(
+        filePath,
+        DeletionVectorDescriptor(
+          deletionVector.getString(0),
+          deletionVector.getString(1),
+          offset,
+          deletionVector.getInt(3),
+          deletionVector.getLong(4),
+          maxRowIndex
+        ),
+        matchedRowCount
+      )
+    }
+
+    result.map(internalRowToDeletionVectorResult).toSeq
+  }
+}
diff --git 
a/backends-clickhouse/src-delta-32/test/scala/org/apache/spark/gluten/delta/GlutenDeltaParquetDeletionVectorSuite.scala
 
b/backends-clickhouse/src-delta-32/test/scala/org/apache/spark/gluten/delta/GlutenDeltaParquetDeletionVectorSuite.scala
index b96e02af2c..cc9fd16c83 100644
--- 
a/backends-clickhouse/src-delta-32/test/scala/org/apache/spark/gluten/delta/GlutenDeltaParquetDeletionVectorSuite.scala
+++ 
b/backends-clickhouse/src-delta-32/test/scala/org/apache/spark/gluten/delta/GlutenDeltaParquetDeletionVectorSuite.scala
@@ -21,12 +21,18 @@ import 
org.apache.gluten.execution.{FileSourceScanExecTransformer, GlutenClickHo
 import org.apache.spark.SparkConf
 import org.apache.spark.sql.Column
 import org.apache.spark.sql.catalyst.expressions.aggregation.BitmapAggregator
+import org.apache.spark.sql.delta.DeltaConfigs
+import org.apache.spark.sql.delta.DeltaLog
 import org.apache.spark.sql.delta.deletionvectors.RoaringBitmapArrayFormat
 import org.apache.spark.sql.delta.files.TahoeFileIndex
+import org.apache.spark.sql.delta.sources.DeltaSQLConf
+import org.apache.spark.sql.execution.DeletionVectorWriteTransformer
 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
 import org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec
 import org.apache.spark.sql.functions.col
 
+import org.apache.hadoop.fs.Path
+
 // Some sqls' line length exceeds 100
 // scalastyle:off line.size.limit
 
@@ -52,7 +58,6 @@ class GlutenDeltaParquetDeletionVectorSuite
       .set("spark.sql.adaptive.enabled", "true")
       .set("spark.sql.files.maxPartitionBytes", "20000000")
       .set("spark.sql.storeAssignmentPolicy", "legacy")
-      // .setCHConfig("use_local_format", true)
       .set("spark.databricks.delta.retentionDurationCheck.enabled", "false")
   }
 
@@ -263,6 +268,117 @@ class GlutenDeltaParquetDeletionVectorSuite
     }
   }
 
+  def testBasic(prefix: Boolean = false)(deleteCallback: => 
Unit)(updateCallback: => Unit): Unit = {
+    val prefix_str = if (prefix) {
+      ", delta.randomizeFilePrefixes=true"
+    } else {
+      ""
+    }
+
+    val tableName = "dv_write_test"
+    withTable(tableName) {
+      withTempDir {
+        dirName =>
+          spark.sql(s"""
+                       |CREATE TABLE IF NOT EXISTS $tableName
+                       |($q1SchemaString)
+                       |USING delta
+                       |TBLPROPERTIES (delta.enableDeletionVectors='true' 
$prefix_str)
+                       |LOCATION '$dirName/$tableName'
+                       |""".stripMargin)
+
+          spark.sql(s"""insert into table $tableName select * from lineitem 
""".stripMargin)
+
+          spark.sql(s"""
+                       | delete from $tableName
+                       | where mod(l_orderkey, 3) = 1 and l_orderkey < 100
+                       |""".stripMargin)
+
+          deleteCallback
+
+          val df = spark.sql(s"""select sum(l_linenumber) from $tableName 
""".stripMargin)
+          val result = df.collect()
+          assertResult(1802335)(result.apply(0).get(0))
+
+          spark.sql(s"""update $tableName set l_orderkey = 1 where l_orderkey 
> 0 """.stripMargin)
+
+          updateCallback
+          val df2 = spark.sql(s"""select sum(l_orderkey) from 
$tableName""".stripMargin)
+          val result2 = df2.collect()
+          assertResult(600536)(result2.apply(0).get(0))
+      }
+    }
+  }
+
+  test("test delta DV write use native writer") {
+    var counter = DeletionVectorWriteTransformer.COUNTER.get()
+    testBasic() {
+      counter = DeletionVectorWriteTransformer.COUNTER.get()
+      assertResult(true)(counter > 0)
+    } {
+      assertResult(true)(DeletionVectorWriteTransformer.COUNTER.get() > 
counter)
+    }
+  }
+
+  test("test delta DV write use native writer with prefix") {
+    var counter = DeletionVectorWriteTransformer.COUNTER.get()
+    testBasic(prefix = true) {
+      counter = DeletionVectorWriteTransformer.COUNTER.get()
+      assertResult(true)(counter > 0)
+    } {
+      assertResult(true)(DeletionVectorWriteTransformer.COUNTER.get() > 
counter)
+    }
+  }
+
+  for (targetDVFileSize <- Seq(2, 200, 2000000)) {
+    test(
+      s"DELETE with DVs - packing multiple DVs into one file: target max DV 
file " +
+        s"size=$targetDVFileSize") {
+      withSQLConf(
+        DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.defaultTablePropertyKey 
-> "true",
+        DeltaSQLConf.DELETE_USE_PERSISTENT_DELETION_VECTORS.key -> "true",
+        DeltaSQLConf.DELETION_VECTOR_PACKING_TARGET_SIZE.key -> 
targetDVFileSize.toString,
+        "spark.sql.shuffle.partitions" -> "1"
+      ) {
+        withTempDir {
+          dirName =>
+            // Create table with 100 files of 2 rows each.
+            val numFiles = 100
+            val path = dirName.getAbsolutePath
+            spark.range(0, 200, step = 1, numPartitions = 
numFiles).write.format("delta").save(path)
+            val tableName = s"delta.`$path`"
+            val numFilesWithDVs = 10
+            val numDeletedRows = numFilesWithDVs * 1
+            spark.sql(s"DELETE FROM $tableName WHERE id % 2 = 0 AND id < 20")
+
+            // Verify the expected number of AddFiles with DVs
+            val allFiles = DeltaLog.forTable(spark, 
path).unsafeVolatileSnapshot.allFiles.collect()
+            assert(allFiles.length === numFiles)
+            val addFilesWithDV = allFiles.filter(_.deletionVector != null)
+            assert(addFilesWithDV.length === numFilesWithDVs)
+            assert(addFilesWithDV.map(_.deletionVector.cardinality).sum == 
numDeletedRows)
+
+            var expectedDVFileCount = 0
+            targetDVFileSize match {
+              // Each AddFile will have its own DV file
+              case 2 => expectedDVFileCount = numFilesWithDVs
+              // Each DV size is about 34bytes according the latest format.
+              case 200 => expectedDVFileCount = numFilesWithDVs / (200 / 
34).floor.toInt
+              // Expect all DVs in one file
+              case 2000000 => expectedDVFileCount = 1
+              case default =>
+                throw new IllegalStateException(s"Unknown target DV file size: 
$default")
+            }
+            // Expect all DVs are written in one file
+            assert(
+              addFilesWithDV.map(_.deletionVector.absolutePath(new 
Path(path))).toSet.size ===
+                expectedDVFileCount
+            )
+        }
+      }
+    }
+  }
+
   test("test parquet partition table delete with the delta DV") {
     withSQLConf(("spark.sql.sources.partitionOverwriteMode", "dynamic")) {
       spark.sql(s"""
diff --git 
a/backends-clickhouse/src-delta/main/scala/org/apache/gluten/component/CHDeltaComponent.scala
 
b/backends-clickhouse/src-delta/main/scala/org/apache/gluten/component/CHDeltaComponent.scala
index 1b187e4a4b..8acdcee777 100644
--- 
a/backends-clickhouse/src-delta/main/scala/org/apache/gluten/component/CHDeltaComponent.scala
+++ 
b/backends-clickhouse/src-delta/main/scala/org/apache/gluten/component/CHDeltaComponent.scala
@@ -23,11 +23,18 @@ import 
org.apache.gluten.extension.columnar.validator.Validators
 import org.apache.gluten.extension.injector.Injector
 import org.apache.gluten.sql.shims.DeltaShimLoader
 
+import org.apache.spark.SparkContext
+import org.apache.spark.api.plugin.PluginContext
+
 class CHDeltaComponent extends Component {
   override def name(): String = "ch-delta"
   override def buildInfo(): Component.BuildInfo =
     Component.BuildInfo("CHDelta", "N/A", "N/A", "N/A")
   override def dependencies(): Seq[Class[_ <: Component]] = classOf[CHBackend] 
:: Nil
+
+  override def onDriverStart(sc: SparkContext, pc: PluginContext): Unit =
+    DeltaShimLoader.getDeltaShims.onDriverStart(sc, pc)
+
   override def injectRules(injector: Injector): Unit = {
     val legacy = injector.gluten.legacy
     legacy.injectTransform {
diff --git 
a/backends-clickhouse/src-delta/main/scala/org/apache/gluten/sql/shims/DeltaShims.scala
 
b/backends-clickhouse/src-delta/main/scala/org/apache/gluten/sql/shims/DeltaShims.scala
index ee263910dd..b88c8718cd 100644
--- 
a/backends-clickhouse/src-delta/main/scala/org/apache/gluten/sql/shims/DeltaShims.scala
+++ 
b/backends-clickhouse/src-delta/main/scala/org/apache/gluten/sql/shims/DeltaShims.scala
@@ -18,13 +18,13 @@ package org.apache.gluten.sql.shims
 
 import org.apache.gluten.execution.GlutenPlan
 
+import org.apache.spark.SparkContext
+import org.apache.spark.api.plugin.PluginContext
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.datasources.PartitionedFile
 
 import java.util.{HashMap => JHashMap, Map => JMap}
 
-sealed abstract class ShimDescriptor
-
 trait DeltaShims {
   def supportDeltaOptimizedWriterExec(plan: SparkPlan): Boolean = false
 
@@ -33,6 +33,8 @@ trait DeltaShims {
       s"Can't transform ColumnarDeltaOptimizedWriterExec from 
${plan.getClass.getSimpleName}")
   }
 
+  def onDriverStart(sc: SparkContext, pc: PluginContext): Unit = {}
+
   def registerExpressionExtension(): Unit = {}
 
   def convertRowIndexFilterIdEncoded(
diff --git 
a/cpp-ch/local-engine/AggregateFunctions/AggregateFunctionDVRoaringBitmap.h 
b/cpp-ch/local-engine/AggregateFunctions/AggregateFunctionDVRoaringBitmap.h
index a60184a5be..de338ce6f0 100644
--- a/cpp-ch/local-engine/AggregateFunctions/AggregateFunctionDVRoaringBitmap.h
+++ b/cpp-ch/local-engine/AggregateFunctions/AggregateFunctionDVRoaringBitmap.h
@@ -36,7 +36,7 @@ struct AggregateFunctionDVRoaringBitmapData
 
     void insertResultInto(DB::ColumnInt64 & cardinality, DB::ColumnInt64 & 
last, DB::ColumnString & bitmap)
     {
-        cardinality.getData().push_back(roaring_bitmap_array.rb_size());
+        cardinality.getData().push_back(roaring_bitmap_array.cardinality());
         auto last_value = roaring_bitmap_array.last();
         if (last_value.has_value())
             last.getData().push_back(last_value.value());
diff --git 
a/cpp-ch/local-engine/Storages/SubstraitSource/Delta/Bitmap/DeltaDVRoaringBitmapArray.cpp
 
b/cpp-ch/local-engine/Storages/SubstraitSource/Delta/Bitmap/DeltaDVRoaringBitmapArray.cpp
index e238a2b362..c45d8d787d 100644
--- 
a/cpp-ch/local-engine/Storages/SubstraitSource/Delta/Bitmap/DeltaDVRoaringBitmapArray.cpp
+++ 
b/cpp-ch/local-engine/Storages/SubstraitSource/Delta/Bitmap/DeltaDVRoaringBitmapArray.cpp
@@ -17,13 +17,14 @@
 #include "DeltaDVRoaringBitmapArray.h"
 
 #include <zlib.h>
+#include <IO/ReadBufferFromString.h>
 #include <IO/ReadHelpers.h>
 #include <IO/WriteHelpers.h>
+#include <Storages/SubstraitSource/ReadBufferBuilder.h>
+#include <substrait/plan.pb.h>
 #include <roaring.hh>
 #include <Poco/URI.h>
 #include <Common/PODArray.h>
-#include <substrait/plan.pb.h>
-#include <Storages/SubstraitSource/ReadBufferBuilder.h>
 
 namespace DB
 {
@@ -82,7 +83,7 @@ void DeltaDVRoaringBitmapArray::rb_read(const String & 
file_path, Int32 offset,
         throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Checksum 
mismatch.");
 }
 
-UInt64 DeltaDVRoaringBitmapArray::rb_size() const
+UInt64 DeltaDVRoaringBitmapArray::cardinality() const
 {
     UInt64 sum = 0;
     for (const auto & r : roaring_bitmap_array)
@@ -137,6 +138,15 @@ void DeltaDVRoaringBitmapArray::rb_merge(const 
DeltaDVRoaringBitmapArray & that)
 {
     rb_or(that);
 }
+
+void DeltaDVRoaringBitmapArray::merge(const String & that)
+{
+    DB::ReadBufferFromString rb(that);
+    DeltaDVRoaringBitmapArray that_bitmap;
+    that_bitmap.deserialize(rb);
+    rb_merge(that_bitmap);
+}
+
 void DeltaDVRoaringBitmapArray::rb_or(const DeltaDVRoaringBitmapArray & that)
 {
     if (roaring_bitmap_array.size() < that.roaring_bitmap_array.size())
diff --git 
a/cpp-ch/local-engine/Storages/SubstraitSource/Delta/Bitmap/DeltaDVRoaringBitmapArray.h
 
b/cpp-ch/local-engine/Storages/SubstraitSource/Delta/Bitmap/DeltaDVRoaringBitmapArray.h
index 067ec5db32..a2d80080e1 100644
--- 
a/cpp-ch/local-engine/Storages/SubstraitSource/Delta/Bitmap/DeltaDVRoaringBitmapArray.h
+++ 
b/cpp-ch/local-engine/Storages/SubstraitSource/Delta/Bitmap/DeltaDVRoaringBitmapArray.h
@@ -24,11 +24,14 @@
 
 namespace local_engine
 {
+
+static constexpr auto DV_FILE_FORMAT_VERSION_ID_V1 = std::byte{1};
+
 /**
   * Roaring bitmap data.
   * For a description of the roaring_bitmap_t, see: 
https://github.com/RoaringBitmap/CRoaring
   */
-class DeltaDVRoaringBitmapArray : private boost::noncopyable
+class DeltaDVRoaringBitmapArray
 {
     static constexpr Int64 MAX_REPRESENTABLE_VALUE
         = (static_cast<UInt64>(INT32_MAX - 1) << 32) | 
(static_cast<UInt64>(INT32_MIN) & 0xFFFFFFFFL);
@@ -43,13 +46,14 @@ public:
     explicit DeltaDVRoaringBitmapArray();
     ~DeltaDVRoaringBitmapArray() = default;
     bool operator==(const DeltaDVRoaringBitmapArray & other) const;
-    UInt64 rb_size() const;
+    UInt64 cardinality() const;
     void rb_read(const String & file_path, Int32 offset, Int32 data_size, 
DB::ContextPtr context);
     bool rb_contains(Int64 x) const;
     bool rb_is_empty() const;
     void rb_clear();
     void rb_add(Int64 value);
     void rb_merge(const DeltaDVRoaringBitmapArray & that);
+    void merge(const String & that);
     void rb_or(const DeltaDVRoaringBitmapArray & that);
     String serialize() const;
     void deserialize(DB::ReadBuffer & buf);
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/Delta/DeltaUtil.cpp 
b/cpp-ch/local-engine/Storages/SubstraitSource/Delta/DeltaUtil.cpp
new file mode 100644
index 0000000000..725e3446a2
--- /dev/null
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/Delta/DeltaUtil.cpp
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "DeltaUtil.h"
+
+#include <Compression/CompressedReadBuffer.h>
+#include <Core/Block.h>
+#include <IO/ReadBuffer.h>
+#include <jni/jni_common.h>
+#include <Common/JNIUtils.h>
+
+namespace local_engine::delta
+{
+
+jclass DeltaUtil::delta_jni_class = nullptr;
+jmethodID DeltaUtil::delta_jni_encode_uuid = nullptr;
+jmethodID DeltaUtil::delta_jni_decode_uuid = nullptr;
+
+void DeltaUtil::initJavaCallerReference(JNIEnv * env)
+{
+    delta_jni_class = CreateGlobalClassReference(env, 
"Lorg/apache/gluten/vectorized/DeltaWriterJNIWrapper;");
+    delta_jni_encode_uuid
+        = GetStaticMethodID(env, delta_jni_class, "encodeUUID", 
"(Ljava/lang/String;Ljava/lang/String;)Ljava/lang/String;");
+    delta_jni_decode_uuid = GetStaticMethodID(env, delta_jni_class, 
"decodeUUID", "(Ljava/lang/String;)Ljava/lang/String;");
+}
+
+void DeltaUtil::releaseJavaCallerReference(JNIEnv * env)
+{
+    if (delta_jni_class)
+        env->DeleteGlobalRef(delta_jni_class);
+}
+
+String DeltaUtil::encodeUUID(String uuid, String prefix)
+{
+    GET_JNIENV(env)
+    jstring jUuid = env->NewStringUTF(uuid.c_str());
+    jstring jRandomPrefix = env->NewStringUTF(prefix.c_str());
+
+    // Call the static Java method
+    jstring jResult = (jstring)env->CallStaticObjectMethod(delta_jni_class, 
delta_jni_encode_uuid, jUuid, jRandomPrefix);
+    const char * resultCStr = env->GetStringUTFChars(jResult, nullptr);
+    std::string encode(resultCStr);
+    env->ReleaseStringUTFChars(jResult, resultCStr);
+    env->DeleteLocalRef(jUuid);
+    env->DeleteLocalRef(jRandomPrefix);
+    env->DeleteLocalRef(jResult);
+    CLEAN_JNIENV
+    return encode;
+}
+
+String DeltaUtil::decodeUUID(String encodedUuid)
+{
+    GET_JNIENV(env)
+    jstring j_encoded_uuid = env->NewStringUTF(encodedUuid.c_str());
+    jstring jResult = (jstring)env->CallStaticObjectMethod(delta_jni_class, 
delta_jni_decode_uuid, j_encoded_uuid);
+    const char * resultCStr = env->GetStringUTFChars(jResult, nullptr);
+    std::string decode_uuid(resultCStr);
+
+    env->ReleaseStringUTFChars(jResult, resultCStr);
+    env->DeleteLocalRef(j_encoded_uuid);
+    env->DeleteLocalRef(jResult);
+    CLEAN_JNIENV
+    return decode_uuid;
+}
+
+}
\ No newline at end of file
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/Delta/DeltaUtil.h 
b/cpp-ch/local-engine/Storages/SubstraitSource/Delta/DeltaUtil.h
new file mode 100644
index 0000000000..d4f80b7d05
--- /dev/null
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/Delta/DeltaUtil.h
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <jni.h>
+#include <jni/jni_common.h>
+
+
+namespace local_engine::delta
+{
+struct Codec
+{
+    struct Base85Codec
+    {
+        static constexpr Int32 ENCODED_UUID_LENGTH = 20;
+    };
+};
+
+
+static constexpr String UUID_DV_MARKER = "u";
+
+class DeltaUtil
+{
+public:
+    static void initJavaCallerReference(JNIEnv * env);
+    static void releaseJavaCallerReference(JNIEnv * env);
+
+    static String encodeUUID(String uuid, String prefix);
+    static String decodeUUID(String encodedUuid);
+
+private:
+    static jclass delta_jni_class;
+    static jmethodID delta_jni_encode_uuid;
+    static jmethodID delta_jni_decode_uuid;
+};
+
+}
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/Delta/DeltaWriter.cpp 
b/cpp-ch/local-engine/Storages/SubstraitSource/Delta/DeltaWriter.cpp
new file mode 100644
index 0000000000..926c3ab215
--- /dev/null
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/Delta/DeltaWriter.cpp
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "DeltaWriter.h"
+
+#include <zlib.h>
+#include <Columns/ColumnNullable.h>
+#include <Columns/ColumnString.h>
+#include <Columns/ColumnsNumber.h>
+#include <Columns/IColumn.h>
+#include <Core/Block_fwd.h>
+#include <Core/Range.h>
+#include <DataTypes/DataTypeNullable.h>
+#include <DataTypes/DataTypeTuple.h>
+#include <DataTypes/DataTypesNumber.h>
+#include <Formats/JSONUtils.h>
+#include <IO/WriteBuffer.h>
+#include <IO/WriteHelpers.h>
+#include <Storages/Output/WriteBufferBuilder.h>
+#include <Storages/SubstraitSource/Delta/DeltaUtil.h>
+#include <rapidjson/document.h>
+#include <Poco/URI.h>
+
+namespace local_engine::delta
+{
+
+String getRandomPrefix(const size_t & length)
+{
+    static const char alphanum[] = "0123456789"
+                                   "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
+                                   "abcdefghijklmnopqrstuvwxyz";
+
+    String res;
+    for (size_t i = 0; i < length; ++i)
+        res += alphanum[rand() % (sizeof(alphanum) - 1)];
+    return res;
+}
+
+DB::DataTypePtr getDeletionVectorType()
+{
+    DB::DataTypes dv_descriptor_types;
+    auto storageType = std::make_shared<DB::DataTypeString>();
+    auto pathOrInlineDv = std::make_shared<DB::DataTypeString>();
+    auto offset = std::make_shared<DB::DataTypeInt32>();
+    auto offset_nullable = std::make_shared<DB::DataTypeNullable>(offset);
+    auto sizeInBytes = std::make_shared<DB::DataTypeInt32>();
+    auto cardinality = std::make_shared<DB::DataTypeInt64>();
+    auto maxRowIndex = std::make_shared<DB::DataTypeInt64>();
+    auto maxRowIndex_nullable = 
std::make_shared<DB::DataTypeNullable>(maxRowIndex);
+
+    dv_descriptor_types.emplace_back(storageType);
+    dv_descriptor_types.emplace_back(pathOrInlineDv);
+    dv_descriptor_types.emplace_back(offset_nullable);
+    dv_descriptor_types.emplace_back(sizeInBytes);
+    dv_descriptor_types.emplace_back(cardinality);
+    dv_descriptor_types.emplace_back(maxRowIndex_nullable);
+
+    DB::Strings dv_descriptor_names;
+    dv_descriptor_names.emplace_back("storageType");
+    dv_descriptor_names.emplace_back("pathOrInlineDv");
+    dv_descriptor_names.emplace_back("offset");
+    dv_descriptor_names.emplace_back("sizeInBytes");
+    dv_descriptor_names.emplace_back("cardinality");
+    dv_descriptor_names.emplace_back("maxRowIndex");
+
+    return std::make_shared<DB::DataTypeTuple>(dv_descriptor_types, 
dv_descriptor_names);
+}
+
+
+void DeltaWriter::writeDeletionVector(const DB::Block & block)
+{
+    const auto & file_path_columns = block.getByPosition(0);
+    const auto & deletion_vector_id_columns = block.getByPosition(1);
+    const auto & bitmap_columns = block.getByPosition(2);
+    const auto & cardinality_src_columns = block.getByPosition(3);
+
+    for (size_t row_idx = 0; row_idx < block.rows(); row_idx++)
+    {
+        const auto file_path = file_path_columns.column->getDataAt(row_idx);
+        auto bitmap = bitmap_columns.column->getDataAt(row_idx);
+        auto cardinality = cardinality_src_columns.column->get64(row_idx); // 
alisa deletedRowIndexCount
+
+        if (size_of_current_bin > 0 && bitmap.size + size_of_current_bin > 
packing_target_size)
+        {
+            write_buffer->finalize();
+            write_buffer = nullptr;
+        }
+
+        if (!deletion_vector_id_columns.column->isNullAt(row_idx))
+        {
+            DB::Field deletion_vector_id_field;
+            deletion_vector_id_columns.column->get(row_idx, 
deletion_vector_id_field);
+            auto existing_deletion_vector_id = 
deletion_vector_id_field.safeGet<String>();
+
+            if (!existing_deletion_vector_id.empty())
+            {
+                rapidjson::Document existingDvDescriptor;
+                
existingDvDescriptor.Parse(existing_deletion_vector_id.c_str());
+
+                String existing_path_or_inline_dv = 
existingDvDescriptor["pathOrInlineDv"].GetString();
+                Int32 existing_offset = 
existingDvDescriptor["offset"].GetInt();
+                Int32 existing_size_in_bytes = 
existingDvDescriptor["sizeInBytes"].GetInt();
+                Int64 existing_cardinality = 
existingDvDescriptor["cardinality"].GetInt64();
+
+                if (cardinality > 0)
+                {
+                    DeltaDVRoaringBitmapArray existing_bitmap
+                        = 
deserializeExistingBitmap(existing_path_or_inline_dv, existing_offset, 
existing_size_in_bytes, table_path);
+                    existing_bitmap.merge(bitmap.toString());
+                    bitmap = existing_bitmap.serialize();
+                    cardinality = existing_bitmap.cardinality();
+                }
+                else
+                {
+                    // use already existing deletion vector
+                    auto dv_descriptor_field = 
createDeletionVectorDescriptorField(
+                        existing_path_or_inline_dv, existing_offset, 
existing_size_in_bytes, existing_cardinality);
+                    file_path_column->insert(file_path.data);
+                    dv_descriptor_column->insert(dv_descriptor_field);
+                    matched_row_count_col->insert(cardinality);
+                    continue;
+                }
+            }
+        }
+
+        if (!write_buffer)
+            initBinPackage();
+
+        size_of_current_bin = size_of_current_bin + bitmap.size;
+        Int32 bitmap_size = static_cast<Int32>(bitmap.size);
+        DB::writeBinaryBigEndian(bitmap_size, *write_buffer);
+
+        write_buffer->write(bitmap.data, bitmap.size);
+        Int32 checksum_value = static_cast<Int32>(crc32_z(0L, 
reinterpret_cast<const unsigned char *>(bitmap.data), bitmap_size));
+        DB::writeBinaryBigEndian(checksum_value, *write_buffer);
+
+        auto dv_descriptor_field
+            = createDeletionVectorDescriptorField(DeltaUtil::encodeUUID(uuid, 
prefix), offset, bitmap_size, cardinality);
+
+        file_path_column->insert(file_path.data);
+        dv_descriptor_column->insert(dv_descriptor_field);
+        matched_row_count_col->insert(cardinality);
+
+        offset = sizeof(bitmap_size) + bitmap_size + sizeof(checksum_value) + 
offset;
+    }
+}
+
+DB::Block * DeltaWriter::finalize()
+{
+    if (size_of_current_bin > 0)
+        write_buffer->finalize();
+
+    DB::Block * res = new DB::Block(
+        {DB::ColumnWithTypeAndName(std::move(file_path_column), 
std::make_shared<DB::DataTypeString>(), "filePath"),
+         DB::ColumnWithTypeAndName(std::move(dv_descriptor_column), 
getDeletionVectorType(), "deletionVector"),
+         DB::ColumnWithTypeAndName(std::move(matched_row_count_col), 
std::make_shared<DB::DataTypeInt64>(), "matchedRowCount")});
+
+    return res;
+}
+
+
+DB::ColumnTuple::MutablePtr DeltaWriter::createDeletionVectorDescriptorColumn()
+{
+    DB::MutableColumns dv_descriptor_mutable_columns;
+    dv_descriptor_mutable_columns.emplace_back(DB::ColumnString::create()); // 
storageType
+    dv_descriptor_mutable_columns.emplace_back(DB::ColumnString::create()); // 
pathOrInlineDv
+    
dv_descriptor_mutable_columns.emplace_back(DB::ColumnNullable::create(DB::ColumnInt32::create(),
 DB::ColumnUInt8::create())); // offset
+    dv_descriptor_mutable_columns.emplace_back(DB::ColumnInt32::create()); // 
sizeInBytes
+    dv_descriptor_mutable_columns.emplace_back(DB::ColumnInt64::create()); // 
cardinality
+    dv_descriptor_mutable_columns.emplace_back(
+        DB::ColumnNullable::create(DB::ColumnInt64::create(), 
DB::ColumnUInt8::create())); // maxRowIndex
+
+    return DB::ColumnTuple::create(std::move(dv_descriptor_mutable_columns));
+}
+
+String DeltaWriter::assembleDeletionVectorPath(const String & table_path, 
const String & prefix, const String & uuid) const
+{
+    String path = table_path + "/";
+    if (!prefix.empty())
+        path += prefix + "/";
+
+    path += DELETION_VECTOR_FILE_NAME_CORE + "_" + uuid + ".bin";
+    return path;
+}
+
+std::unique_ptr<DB::WriteBuffer> DeltaWriter::createWriteBuffer(const String & 
table_path, const String & prefix, const String & uuid) const
+{
+    String dv_file = assembleDeletionVectorPath(table_path, prefix, uuid);
+
+    std::string encoded;
+    Poco::URI::encode(dv_file, "", encoded);
+    const Poco::URI poco_uri(encoded);
+    const auto write_buffer_builder = 
WriteBufferBuilderFactory::instance().createBuilder(poco_uri.getScheme(), 
context);
+    return write_buffer_builder->build(poco_uri.toString());
+}
+
+DeltaDVRoaringBitmapArray DeltaWriter::deserializeExistingBitmap(
+    const String & existing_path_or_inline_dv,
+    const Int32 & existing_offset,
+    const Int32 & existing_size_in_bytes,
+    const String & table_path) const
+{
+    const auto random_prefix_length = existing_path_or_inline_dv.length() - 
Codec::Base85Codec::ENCODED_UUID_LENGTH;
+    const auto randomPrefix = existing_path_or_inline_dv.substr(0, 
random_prefix_length);
+    const auto encoded_uuid = 
existing_path_or_inline_dv.substr(random_prefix_length);
+    const auto existing_decode_uuid = DeltaUtil::decodeUUID(encoded_uuid);
+    const String existing_dv_file = assembleDeletionVectorPath(table_path, 
randomPrefix, existing_decode_uuid);
+    DeltaDVRoaringBitmapArray existing_bitmap;
+    existing_bitmap.rb_read(existing_dv_file, existing_offset, 
existing_size_in_bytes, context);
+    return existing_bitmap;
+}
+
+DB::Tuple DeltaWriter::createDeletionVectorDescriptorField(
+    const String & path_or_inline_dv, const Int32 & offset, const Int32 & 
size_in_bytes, const Int64 & cardinality)
+{
+    DB::Tuple tuple;
+    tuple.emplace_back(UUID_DV_MARKER); // storageType
+    tuple.emplace_back(path_or_inline_dv); // pathOrInlineDv
+    tuple.emplace_back(offset); // offset
+    tuple.emplace_back(size_in_bytes); // sizeInBytes
+    tuple.emplace_back(cardinality); // cardinality
+    tuple.emplace_back(DB::Field{}); // maxRowIndex
+    return tuple;
+}
+
+void DeltaWriter::initBinPackage()
+{
+    offset = 0;
+    size_of_current_bin = 0;
+    prefix = getRandomPrefix(prefix_length);
+    uuid = DB::toString(DB::UUIDHelpers::generateV4());
+    write_buffer = createWriteBuffer(table_path, prefix, uuid);
+    DB::writeIntBinary(DV_FILE_FORMAT_VERSION_ID_V1, *write_buffer);
+    offset++;
+}
+
+
+}
\ No newline at end of file
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/Delta/DeltaWriter.h 
b/cpp-ch/local-engine/Storages/SubstraitSource/Delta/DeltaWriter.h
new file mode 100644
index 0000000000..f6d7372ff4
--- /dev/null
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/Delta/DeltaWriter.h
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <Columns/ColumnString.h>
+#include <Columns/ColumnTuple.h>
+#include <Columns/ColumnsNumber.h>
+#include <Core/Block.h>
+#include <Core/Field.h>
+#include <Interpreters/Context_fwd.h>
+#include <Storages/SubstraitSource/Delta/Bitmap/DeltaDVRoaringBitmapArray.h>
+
+namespace local_engine::delta
+{
+
+struct DeletionVectorDescriptor
+{
+};
+
+class DeltaWriter
+{
+    static constexpr String UUID_DV_MARKER = "u";
+    static constexpr String DELETION_VECTOR_FILE_NAME_CORE = "deletion_vector";
+
+public:
+    explicit DeltaWriter(
+        const DB::ContextPtr & context_, const String & table_path_, const 
size_t & prefix_length_, const size_t & packing_target_size_)
+        : context(context_), table_path(table_path_), 
prefix_length(prefix_length_), packing_target_size(packing_target_size_)
+    {
+        file_path_column = DB::ColumnString::create();
+        dv_descriptor_column = createDeletionVectorDescriptorColumn();
+        matched_row_count_col = DB::ColumnInt64::create();
+    }
+    void
+    writeDeletionVector(const DB::Block & block);
+
+    DB::Block * finalize();
+
+private:
+    DB::ColumnTuple::MutablePtr createDeletionVectorDescriptorColumn();
+    String assembleDeletionVectorPath(const String & table_path, const String 
& prefix, const String & uuid) const;
+    std::unique_ptr<DB::WriteBuffer> createWriteBuffer(const String & 
table_path, const String & prefix, const String & uuid) const;
+    DeltaDVRoaringBitmapArray deserializeExistingBitmap(
+        const String & existing_path_or_inline_dv,
+        const Int32 & existing_offset,
+        const Int32 & existing_size_in_bytes,
+        const String & table_path) const;
+    DB::Tuple createDeletionVectorDescriptorField(
+        const String & path_or_inline_dv, const Int32 & offset, const Int32 & 
size_in_bytes, const Int64 & cardinality);
+
+    void initBinPackage();
+
+    const DB::ContextPtr & context;
+    const String table_path;
+    const size_t prefix_length;
+    const size_t packing_target_size;
+
+    DB::MutableColumnPtr file_path_column;
+    DB::MutableColumnPtr dv_descriptor_column;
+    DB::MutableColumnPtr matched_row_count_col;
+    std::unique_ptr<DB::WriteBuffer> write_buffer;
+
+    size_t offset = 0;
+    size_t size_of_current_bin = 0;
+    String prefix;
+    String uuid;
+};
+
+
+}
diff --git a/cpp-ch/local-engine/local_engine_jni.cpp 
b/cpp-ch/local-engine/local_engine_jni.cpp
index a4da80ae93..e3ac674360 100644
--- a/cpp-ch/local-engine/local_engine_jni.cpp
+++ b/cpp-ch/local-engine/local_engine_jni.cpp
@@ -46,6 +46,8 @@
 #include <Storages/MergeTree/StorageMergeTreeFactory.h>
 #include <Storages/Output/BlockStripeSplitter.h>
 #include <Storages/Output/NormalFileWriter.h>
+#include <Storages/SubstraitSource/Delta/DeltaWriter.h>
+#include <Storages/SubstraitSource/Delta/DeltaUtil.h>
 #include <Storages/SubstraitSource/ReadBufferBuilder.h>
 #include <jni/SharedPointerWrapper.h>
 #include <jni/jni_common.h>
@@ -198,6 +200,7 @@ JNIEXPORT void 
Java_org_apache_gluten_vectorized_ExpressionEvaluatorJniWrapper_n
     local_engine::BroadCastJoinBuilder::destroy(env);
     local_engine::SparkMergeTreeWriterJNI::destroy(env);
     local_engine::SparkRowInfoJNI::destroy(env);
+    local_engine::delta::DeltaUtil::releaseJavaCallerReference(env);
 
     env->DeleteGlobalRef(block_stripes_class);
     env->DeleteGlobalRef(split_result_class);
@@ -1334,6 +1337,46 @@ JNIEXPORT void 
Java_org_apache_gluten_execution_CHNativeCacheManager_removeFiles
     LOCAL_ENGINE_JNI_METHOD_END(env, );
 }
 
+JNIEXPORT jlong 
Java_org_apache_gluten_vectorized_DeltaWriterJNIWrapper_createDeletionVectorWriter(
+    JNIEnv * env, jclass, jstring table_path_, jint prefix_length_, jlong 
packingTargetSize_)
+{
+    LOCAL_ENGINE_JNI_METHOD_START
+    auto table_path = jstring2string(env, table_path_);
+
+    const auto query_context = 
local_engine::QueryContext::instance().currentQueryContext();
+    auto writer = new local_engine::delta::DeltaWriter(query_context, 
table_path, prefix_length_, packingTargetSize_);
+    return reinterpret_cast<jlong>(writer);
+    LOCAL_ENGINE_JNI_METHOD_END(env, -1);
+}
+
+JNIEXPORT void 
Java_org_apache_gluten_vectorized_DeltaWriterJNIWrapper_deletionVectorWrite(
+    JNIEnv * env, jclass, jlong writer_address_, jlong blockAddress)
+{
+    LOCAL_ENGINE_JNI_METHOD_START
+    const auto * block = reinterpret_cast<DB::Block *>(blockAddress);
+    auto * writer = reinterpret_cast<local_engine::delta::DeltaWriter 
*>(writer_address_);
+    writer->writeDeletionVector(*block);
+    LOCAL_ENGINE_JNI_METHOD_END(env, );
+}
+
+JNIEXPORT jlong
+Java_org_apache_gluten_vectorized_DeltaWriterJNIWrapper_deletionVectorWriteFinalize(JNIEnv
 * env, jclass, jlong writer_address_)
+{
+    LOCAL_ENGINE_JNI_METHOD_START
+    auto * writer = reinterpret_cast<local_engine::delta::DeltaWriter 
*>(writer_address_);
+    auto * column_batch = writer->finalize();
+    delete writer;
+    return reinterpret_cast<UInt64>(column_batch);
+    LOCAL_ENGINE_JNI_METHOD_END(env, -1);
+}
+
+JNIEXPORT void 
Java_org_apache_gluten_vectorized_DeltaWriterJNIWrapper_registerNativeReference(JNIEnv
 * env, jclass)
+{
+    LOCAL_ENGINE_JNI_METHOD_START
+    local_engine::delta::DeltaUtil::initJavaCallerReference(env);
+    LOCAL_ENGINE_JNI_METHOD_END(env, );
+}
+
 #ifdef __cplusplus
 }
 
diff --git a/cpp-ch/local-engine/tests/gtest_clickhouse_roaring_bitmap.cpp 
b/cpp-ch/local-engine/tests/gtest_clickhouse_roaring_bitmap.cpp
index 3dd13c9ccd..d5194c9f01 100644
--- a/cpp-ch/local-engine/tests/gtest_clickhouse_roaring_bitmap.cpp
+++ b/cpp-ch/local-engine/tests/gtest_clickhouse_roaring_bitmap.cpp
@@ -176,7 +176,7 @@ TEST(Delta_DV, DeltaDVRoaringBitmapArray)
     DeltaDVRoaringBitmapArray bitmap_array2{};
     bitmap_array2.rb_read(file_uri2, 1, 4047, context);
     EXPECT_FALSE(bitmap_array2.rb_is_empty());
-    EXPECT_EQ(2098, bitmap_array2.rb_size());
+    EXPECT_EQ(2098, bitmap_array2.cardinality());
     EXPECT_TRUE(bitmap_array2.rb_contains(0));
     EXPECT_TRUE(bitmap_array2.rb_contains(1003));
     EXPECT_TRUE(bitmap_array2.rb_contains(880));


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to