This is an automated email from the ASF dual-hosted git repository.
loneylee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new a4b230bbbf [GLUTEN-9205][CH] Support deletion vector native write
(#9248)
a4b230bbbf is described below
commit a4b230bbbfc34994d2ad838b1c15d7211168b3a7
Author: Shuai li <[email protected]>
AuthorDate: Wed Apr 9 11:18:13 2025 +0800
[GLUTEN-9205][CH] Support deletion vector native write (#9248)
* [GLUTEN-9205][CH] Support deletion vector native write
---
.../gluten/vectorized/DeltaWriterJNIWrapper.java | 44 ++
.../gluten/sql/shims/delta32/Delta32Shims.scala | 7 +
.../commands/DMLWithDeletionVectorsHelper.scala | 701 +++++++++++++++++++++
.../execution/DeletionVectorWriteTransformer.scala | 175 +++++
.../GlutenDeltaParquetDeletionVectorSuite.scala | 118 +++-
.../apache/gluten/component/CHDeltaComponent.scala | 7 +
.../org/apache/gluten/sql/shims/DeltaShims.scala | 6 +-
.../AggregateFunctionDVRoaringBitmap.h | 2 +-
.../Delta/Bitmap/DeltaDVRoaringBitmapArray.cpp | 16 +-
.../Delta/Bitmap/DeltaDVRoaringBitmapArray.h | 8 +-
.../Storages/SubstraitSource/Delta/DeltaUtil.cpp | 80 +++
.../Storages/SubstraitSource/Delta/DeltaUtil.h | 51 ++
.../Storages/SubstraitSource/Delta/DeltaWriter.cpp | 252 ++++++++
.../Storages/SubstraitSource/Delta/DeltaWriter.h | 84 +++
cpp-ch/local-engine/local_engine_jni.cpp | 43 ++
.../tests/gtest_clickhouse_roaring_bitmap.cpp | 2 +-
16 files changed, 1586 insertions(+), 10 deletions(-)
diff --git
a/backends-clickhouse/src-delta-32/main/java/org/apache/gluten/vectorized/DeltaWriterJNIWrapper.java
b/backends-clickhouse/src-delta-32/main/java/org/apache/gluten/vectorized/DeltaWriterJNIWrapper.java
new file mode 100644
index 0000000000..ada976668c
--- /dev/null
+++
b/backends-clickhouse/src-delta-32/main/java/org/apache/gluten/vectorized/DeltaWriterJNIWrapper.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.vectorized;
+
+import org.apache.spark.sql.execution.DeletionVectorWriteTransformer;
+
+public class DeltaWriterJNIWrapper {
+
+ private DeltaWriterJNIWrapper() {
+ // utility class
+ }
+
+ public static native void registerNativeReference();
+
+ public static native long createDeletionVectorWriter(String tablePath, int
prefix_length, long packingTargetSize);
+
+ public static native void deletionVectorWrite(long writer_address, long
block_address);
+
+ public static native long deletionVectorWriteFinalize(long writer_address);
+
+ // call from native
+ public static String encodeUUID(String uuid, String randomPrefix) {
+ return DeletionVectorWriteTransformer.encodeUUID(uuid, randomPrefix);
+ }
+
+ // call from native
+ public static String decodeUUID(String encodedUuid) {
+ return DeletionVectorWriteTransformer.decodeUUID(encodedUuid);
+ }
+}
diff --git
a/backends-clickhouse/src-delta-32/main/scala/org/apache/gluten/sql/shims/delta32/Delta32Shims.scala
b/backends-clickhouse/src-delta-32/main/scala/org/apache/gluten/sql/shims/delta32/Delta32Shims.scala
index 52da7b7883..0244bf3ffd 100644
---
a/backends-clickhouse/src-delta-32/main/scala/org/apache/gluten/sql/shims/delta32/Delta32Shims.scala
+++
b/backends-clickhouse/src-delta-32/main/scala/org/apache/gluten/sql/shims/delta32/Delta32Shims.scala
@@ -19,7 +19,10 @@ package org.apache.gluten.sql.shims.delta32
import org.apache.gluten.execution.GlutenPlan
import org.apache.gluten.extension.{DeltaExpressionExtensionTransformer,
ExpressionExtensionTrait}
import org.apache.gluten.sql.shims.DeltaShims
+import org.apache.gluten.vectorized.DeltaWriterJNIWrapper
+import org.apache.spark.SparkContext
+import org.apache.spark.api.plugin.PluginContext
import org.apache.spark.sql.delta.DeltaParquetFileFormat
import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor
import org.apache.spark.sql.delta.util.JsonUtils
@@ -41,6 +44,10 @@ class Delta32Shims extends DeltaShims {
DeltaOptimizedWriterTransformer.from(plan)
}
+ override def onDriverStart(sc: SparkContext, pc: PluginContext): Unit = {
+ DeltaWriterJNIWrapper.registerNativeReference()
+ }
+
override def registerExpressionExtension(): Unit = {
ExpressionExtensionTrait.registerExpressionExtension(DeltaExpressionExtensionTransformer())
}
diff --git
a/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/delta/commands/DMLWithDeletionVectorsHelper.scala
b/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/delta/commands/DMLWithDeletionVectorsHelper.scala
new file mode 100644
index 0000000000..889c28cf0c
--- /dev/null
+++
b/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/delta/commands/DMLWithDeletionVectorsHelper.scala
@@ -0,0 +1,701 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.delta.commands
+
+import org.apache.gluten.config.GlutenConfig
+
+import java.util.UUID
+import scala.collection.generic.Sizing
+import org.apache.spark.sql.catalyst.expressions.aggregation.BitmapAggregator
+import org.apache.spark.sql.delta.{DeltaLog, DeltaParquetFileFormat,
OptimisticTransaction, Snapshot}
+import org.apache.spark.sql.delta.DeltaParquetFileFormat._
+import org.apache.spark.sql.delta.actions.{AddFile, DeletionVectorDescriptor,
FileAction}
+import org.apache.spark.sql.delta.deletionvectors.{RoaringBitmapArray,
RoaringBitmapArrayFormat, StoredBitmap}
+import org.apache.spark.sql.delta.files.{TahoeBatchFileIndex, TahoeFileIndex}
+import org.apache.spark.sql.delta.metering.DeltaLogging
+import org.apache.spark.sql.delta.sources.DeltaSQLConf
+import org.apache.spark.sql.delta.stats.StatsCollectionUtils
+import org.apache.spark.sql.delta.storage.dv.DeletionVectorStore
+import org.apache.spark.sql.delta.util.{BinPackingIterator, DeltaEncoder,
JsonUtils, PathWithFileSystem, Utils => DeltaUtils}
+import org.apache.spark.sql.delta.util.DeltaFileOperations.absolutePath
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.spark.paths.SparkPath
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference,
Expression}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
+import org.apache.spark.sql.execution.DeletionVectorWriteTransformer
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation,
LogicalRelation}
+import org.apache.spark.sql.execution.datasources.FileFormat.{FILE_PATH,
METADATA_NAME}
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.functions.{col, lit}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.{SerializableConfiguration, Utils => SparkUtils}
+
+/**
+ * Gluten overwrite Delta:
+ *
+ * This file is copied from Delta 3.2.1.
+ */
+
+/**
+ * Contains utility classes and method for performing DML operations with
Deletion Vectors.
+ */
+object DMLWithDeletionVectorsHelper extends DeltaCommand {
+ val SUPPORTED_DML_COMMANDS: Seq[String] = Seq("DELETE", "UPDATE")
+
+ /**
+ * Creates a DataFrame that can be used to scan for rows matching the
condition in the given
+ * files. Generally the given file list is a pruned file list using the
stats based pruning.
+ */
+ def createTargetDfForScanningForMatches(
+ spark: SparkSession,
+ target: LogicalPlan,
+ fileIndex: TahoeFileIndex): DataFrame = {
+ Dataset.ofRows(spark, replaceFileIndex(spark, target, fileIndex))
+ }
+
+ /**
+ * Replace the file index in a logical plan and return the updated plan.
+ * It's a common pattern that, in Delta commands, we use data skipping to
determine a subset of
+ * files that can be affected by the command, so we replace the whole-table
file index in the
+ * original logical plan with a new index of potentially affected files,
while everything else in
+ * the original plan, e.g., resolved references, remain unchanged.
+ *
+ * In addition we also request a metadata column and a row index column from
the Scan to help
+ * generate the Deletion Vectors. When predicate pushdown is enabled, we
only request the
+ * metadata column. This is because we can utilize _metadata.row_index
instead of generating a
+ * custom one.
+ *
+ * @param spark the active spark session
+ * @param target the logical plan in which we replace the file index
+ * @param fileIndex the new file index
+ */
+ private def replaceFileIndex(
+ spark: SparkSession,
+ target: LogicalPlan,
+ fileIndex: TahoeFileIndex): LogicalPlan = {
+ val useMetadataRowIndex =
+
spark.sessionState.conf.getConf(DeltaSQLConf.DELETION_VECTORS_USE_METADATA_ROW_INDEX)
+ // This is only used when predicate pushdown is disabled.
+ val rowIndexCol = AttributeReference(ROW_INDEX_COLUMN_NAME,
ROW_INDEX_STRUCT_FIELD.dataType)()
+
+ var fileMetadataCol: AttributeReference = null
+
+ val newTarget = target.transformUp {
+ case l @ LogicalRelation(
+ hfsr @ HadoopFsRelation(_, _, _, _, format: DeltaParquetFileFormat,
_), _, _, _) =>
+ fileMetadataCol = format.createFileMetadataCol()
+ // Take the existing schema and add additional metadata columns
+ if (useMetadataRowIndex) {
+ l.copy(
+ relation = hfsr.copy(location = fileIndex)(hfsr.sparkSession),
+ output = l.output :+ fileMetadataCol)
+ } else {
+ val newDataSchema =
+ StructType(hfsr.dataSchema).add(ROW_INDEX_STRUCT_FIELD)
+ val finalOutput = l.output ++ Seq(rowIndexCol, fileMetadataCol)
+ // Disable splitting and filter pushdown in order to generate the
row-indexes.
+ val newFormat = format.copy(optimizationsEnabled = false)
+ val newBaseRelation = hfsr.copy(
+ location = fileIndex,
+ dataSchema = newDataSchema,
+ fileFormat = newFormat)(hfsr.sparkSession)
+
+ l.copy(relation = newBaseRelation, output = finalOutput)
+ }
+ case p @ Project(projectList, _) =>
+ if (fileMetadataCol == null) {
+ throw new IllegalStateException("File metadata column is not yet
created.")
+ }
+ val rowIndexColOpt = if (useMetadataRowIndex) None else
Some(rowIndexCol)
+ val additionalColumns = Seq(fileMetadataCol) ++ rowIndexColOpt
+ p.copy(projectList = projectList ++ additionalColumns)
+ }
+ newTarget
+ }
+
+ /**
+ * Find the target table files that contain rows that satisfy the condition
and a DV attached
+ * to each file that indicates a the rows marked as deleted from the file
+ */
+ def findTouchedFiles(
+ sparkSession: SparkSession,
+ txn: OptimisticTransaction,
+ hasDVsEnabled: Boolean,
+ deltaLog: DeltaLog,
+ targetDf: DataFrame,
+ fileIndex: TahoeFileIndex,
+ condition: Expression,
+ opName: String): Seq[TouchedFileWithDV] = {
+ require(
+ SUPPORTED_DML_COMMANDS.contains(opName),
+ s"Expecting opName to be one of ${SUPPORTED_DML_COMMANDS.mkString(",
")}, " +
+ s"but got '$opName'.")
+
+ recordDeltaOperation(deltaLog, opType = s"$opName.findTouchedFiles") {
+ val candidateFiles = fileIndex match {
+ case f: TahoeBatchFileIndex => f.addFiles
+ case _ => throw new IllegalArgumentException("Unexpected file index
found!")
+ }
+
+ val matchedRowIndexSets =
+
DeletionVectorBitmapGenerator.buildRowIndexSetsForFilesMatchingCondition(
+ sparkSession,
+ txn,
+ hasDVsEnabled,
+ targetDf,
+ candidateFiles,
+ condition)
+
+ val nameToAddFileMap = generateCandidateFileMap(txn.deltaLog.dataPath,
candidateFiles)
+ findFilesWithMatchingRows(txn, nameToAddFileMap, matchedRowIndexSets)
+ }
+ }
+
+ /**
+ * Finds the files in nameToAddFileMap in which rows were deleted by
checking the row index set.
+ */
+ def findFilesWithMatchingRows(
+ txn: OptimisticTransaction,
+ nameToAddFileMap: Map[String, AddFile],
+ matchedFileRowIndexSets: Seq[DeletionVectorResult]):
Seq[TouchedFileWithDV] = {
+ // Get the AddFiles using the touched file names and group them together
with other
+ // information we need for later phases.
+ val dataPath = txn.deltaLog.dataPath
+ val touchedFilesWithMatchedRowIndices = matchedFileRowIndexSets.map {
fileRowIndex =>
+ val filePath = fileRowIndex.filePath
+ val addFile = getTouchedFile(dataPath, filePath, nameToAddFileMap)
+ TouchedFileWithDV(
+ filePath,
+ addFile,
+ fileRowIndex.deletionVector,
+ fileRowIndex.matchedRowCount)
+ }
+
+ logTrace("findTouchedFiles: matched files:\n\t" +
+
s"${touchedFilesWithMatchedRowIndices.map(_.inputFilePath).mkString("\n\t")}")
+
+ touchedFilesWithMatchedRowIndices.filterNot(_.isUnchanged)
+ }
+
+ def processUnmodifiedData(
+ spark: SparkSession,
+ touchedFiles: Seq[TouchedFileWithDV],
+ snapshot: Snapshot): (Seq[FileAction], Map[String, Long]) = {
+ val numModifiedRows: Long = touchedFiles.map(_.numberOfModifiedRows).sum
+ val numRemovedFiles: Long = touchedFiles.count(_.isFullyReplaced())
+
+ val (fullyRemovedFiles, notFullyRemovedFiles) =
touchedFiles.partition(_.isFullyReplaced())
+
+ val timestamp = System.currentTimeMillis()
+ val fullyRemoved =
fullyRemovedFiles.map(_.fileLogEntry.removeWithTimestamp(timestamp))
+
+ val dvUpdates = notFullyRemovedFiles.map { fileWithDVInfo =>
+ fileWithDVInfo.fileLogEntry.removeRows(
+ deletionVector = fileWithDVInfo.newDeletionVector,
+ updateStats = false
+ )}
+ val (dvAddFiles, dvRemoveFiles) = dvUpdates.unzip
+ val dvAddFilesWithStats = getActionsWithStats(spark, dvAddFiles, snapshot)
+
+ var (numDeletionVectorsAdded, numDeletionVectorsRemoved,
numDeletionVectorsUpdated) =
+ dvUpdates.foldLeft((0L, 0L, 0L)) { case ((added, removed, updated),
(addFile, removeFile)) =>
+ (Option(addFile.deletionVector), Option(removeFile.deletionVector))
match {
+ case (Some(_), Some(_)) => (added, removed, updated + 1)
+ case (None, Some(_)) => (added, removed + 1, updated)
+ case (Some(_), None) => (added + 1, removed, updated)
+ case _ => (added, removed, updated)
+ }
+ }
+ numDeletionVectorsRemoved += fullyRemoved.count(_.deletionVector != null)
+ val metricMap = Map(
+ "numModifiedRows" -> numModifiedRows,
+ "numRemovedFiles" -> numRemovedFiles,
+ "numDeletionVectorsAdded" -> numDeletionVectorsAdded,
+ "numDeletionVectorsRemoved" -> numDeletionVectorsRemoved,
+ "numDeletionVectorsUpdated" -> numDeletionVectorsUpdated)
+ (fullyRemoved ++ dvAddFilesWithStats ++ dvRemoveFiles, metricMap)
+ }
+
+ /** Fetch stats for `addFiles`. */
+ private def getActionsWithStats(
+ spark: SparkSession,
+ addFilesWithNewDvs: Seq[AddFile],
+ snapshot: Snapshot): Seq[AddFile] = {
+ import org.apache.spark.sql.delta.implicits._
+
+ if (addFilesWithNewDvs.isEmpty) return Seq.empty
+
+ val selectionPathAndStatsCols = Seq(col("path"), col("stats"))
+ val addFilesWithNewDvsDf = addFilesWithNewDvs.toDF(spark)
+
+ // These files originate from snapshot.filesForScan which resets column
statistics.
+ // Since these object don't carry stats and tags, if we were to use them
as result actions of
+ // the operation directly, we'd effectively be removing all stats and
tags. To resolve this
+ // we join the list of files with DVs with the log (allFiles) to retrieve
statistics. This is
+ // expected to have better performance than supporting full stats retrieval
+ // in snapshot.filesForScan because it only affects a subset of the
scanned files.
+
+ // Find the current metadata with stats for all files with new DV
+ val addFileWithStatsDf = snapshot.withStats
+ .join(addFilesWithNewDvsDf.select("path"), "path")
+
+ // Update the existing stats to set the tightBounds to false and also set
the appropriate
+ // null count. We want to set the bounds before the AddFile has DV
descriptor attached.
+ // Attaching the DV descriptor here, causes wrong logical records
computation in
+ // `updateStatsToWideBounds`.
+ val statsColName = snapshot.getBaseStatsColumnName
+ val addFilesWithWideBoundsDf = snapshot
+ .updateStatsToWideBounds(addFileWithStatsDf, statsColName)
+
+ val (filesWithNoStats, filesWithExistingStats) = {
+ // numRecords is the only stat we really have to guarantee.
+ // If the others are missing, we do not need to fetch them.
+ addFilesWithWideBoundsDf.as[AddFile].collect().toSeq
+ .partition(_.numPhysicalRecords.isEmpty)
+ }
+
+ // If we encounter files with no stats we fetch the stats from the parquet
footer.
+ // Files with persistent DVs *must* have (at least numRecords) stats
according to the
+ // Delta spec.
+ val filesWithFetchedStats =
+ if (filesWithNoStats.nonEmpty) {
+ StatsCollectionUtils.computeStats(spark,
+ conf = snapshot.deltaLog.newDeltaHadoopConf(),
+ dataPath = snapshot.deltaLog.dataPath,
+ addFiles = filesWithNoStats.toDS(spark),
+ numFilesOpt = Some(filesWithNoStats.size),
+ columnMappingMode = snapshot.metadata.columnMappingMode,
+ dataSchema = snapshot.dataSchema,
+ statsSchema = snapshot.statsSchema,
+ setBoundsToWide = true)
+ .collect()
+ .toSeq
+ } else {
+ Seq.empty
+ }
+
+ val allAddFilesWithUpdatedStats =
+ (filesWithExistingStats ++ filesWithFetchedStats).toSeq.toDF(spark)
+
+ // Now join the allAddFilesWithUpdatedStats with addFilesWithNewDvs
+ // so that the updated stats are joined with the new DV info
+ addFilesWithNewDvsDf.drop("stats")
+ .join(
+ allAddFilesWithUpdatedStats.select(selectionPathAndStatsCols: _*),
"path")
+ .as[AddFile]
+ .collect()
+ .toSeq
+ }
+}
+
+object DeletionVectorBitmapGenerator {
+ final val FILE_NAME_COL = "filePath"
+ final val FILE_DV_ID_COL = "deletionVectorId"
+ final val ROW_INDEX_COL = "rowIndexCol"
+ final val DELETED_ROW_INDEX_BITMAP = "deletedRowIndexSet"
+ final val DELETED_ROW_INDEX_COUNT = "deletedRowIndexCount"
+ final val MAX_ROW_INDEX_COL = "maxRowIndexCol"
+
+ private class DeletionVectorSet(
+ spark: SparkSession,
+ target: DataFrame,
+ targetDeltaLog: DeltaLog,
+ deltaTxn: OptimisticTransaction) {
+
+ case object CardinalityAndBitmapStruct {
+ val name: String = "CardinalityAndBitmapStruct"
+ def cardinality: String = s"$name.cardinality"
+ def bitmap: String = s"$name.bitmap"
+ }
+
+ def computeResult(): Seq[DeletionVectorResult] = {
+ val aggregated = target
+ .groupBy(col(FILE_NAME_COL), col(FILE_DV_ID_COL))
+ .agg(aggColumns.head, aggColumns.tail: _*)
+ .select(outputColumns: _*)
+
+ // --- modified start
+ if (spark.conf
+ .get(GlutenConfig.GLUTEN_ENABLED.key,
GlutenConfig.GLUTEN_ENABLED.defaultValueString)
+ .toBoolean) {
+ DeletionVectorWriteTransformer.replace(aggregated,
targetDeltaLog.dataPath, deltaTxn, spark)
+ } else {
+ import DeletionVectorResult.encoder
+ val rowIndexData = aggregated.as[DeletionVectorData]
+ val storedResults = rowIndexData.mapPartitions(bitmapStorageMapper())
+ storedResults.as[DeletionVectorResult].collect()
+ }
+ // --- modified end
+ }
+
+ protected def aggColumns: Seq[Column] = {
+
Seq(createBitmapSetAggregator(col(ROW_INDEX_COL)).as(CardinalityAndBitmapStruct.name))
+ }
+
+ /** Create a bitmap set aggregator over the given column */
+ private def createBitmapSetAggregator(indexColumn: Column): Column = {
+ val func = new BitmapAggregator(indexColumn.expr,
RoaringBitmapArrayFormat.Portable)
+ new Column(func.toAggregateExpression(isDistinct = false))
+ }
+
+ protected def outputColumns: Seq[Column] =
+ Seq(
+ col(FILE_NAME_COL),
+ col(FILE_DV_ID_COL),
+ col(CardinalityAndBitmapStruct.bitmap).as(DELETED_ROW_INDEX_BITMAP),
+ col(CardinalityAndBitmapStruct.cardinality).as(DELETED_ROW_INDEX_COUNT)
+ )
+
+ protected def bitmapStorageMapper()
+ : Iterator[DeletionVectorData] => Iterator[DeletionVectorResult] = {
+ val prefixLen = DeltaUtils.getRandomPrefixLength(deltaTxn.metadata)
+ DeletionVectorWriter.createMapperToStoreDeletionVectors(
+ spark,
+ targetDeltaLog.newDeltaHadoopConf(),
+ targetDeltaLog.dataPath,
+ prefixLen)
+ }
+ }
+
+ /**
+ * Build bitmap compressed sets of row indices for each file in [[target]]
using
+ * [[ROW_INDEX_COL]].
+ * Write those sets out to temporary files and collect the file names,
+ * together with some encoded metadata about the contents.
+ *
+ * @param target DataFrame with expected schema [[FILE_NAME_COL]],
[[ROW_INDEX_COL]],
+ */
+ def buildDeletionVectors(
+ spark: SparkSession,
+ target: DataFrame,
+ targetDeltaLog: DeltaLog,
+ deltaTxn: OptimisticTransaction): Seq[DeletionVectorResult] = {
+ val rowIndexSet = new DeletionVectorSet(spark, target, targetDeltaLog,
deltaTxn)
+ rowIndexSet.computeResult()
+ }
+
+ def buildRowIndexSetsForFilesMatchingCondition(
+ sparkSession: SparkSession,
+ txn: OptimisticTransaction,
+ tableHasDVs: Boolean,
+ targetDf: DataFrame,
+ candidateFiles: Seq[AddFile],
+ condition: Expression,
+ fileNameColumnOpt: Option[Column] = None,
+ rowIndexColumnOpt: Option[Column] = None): Seq[DeletionVectorResult] = {
+ val useMetadataRowIndexConf =
DeltaSQLConf.DELETION_VECTORS_USE_METADATA_ROW_INDEX
+ val useMetadataRowIndex =
sparkSession.sessionState.conf.getConf(useMetadataRowIndexConf)
+ val fileNameColumn =
fileNameColumnOpt.getOrElse(col(s"${METADATA_NAME}.${FILE_PATH}"))
+ val rowIndexColumn = if (useMetadataRowIndex) {
+
rowIndexColumnOpt.getOrElse(col(s"${METADATA_NAME}.${ParquetFileFormat.ROW_INDEX}"))
+ } else {
+ rowIndexColumnOpt.getOrElse(col(ROW_INDEX_COLUMN_NAME))
+ }
+ val matchedRowsDf = targetDf
+ .withColumn(FILE_NAME_COL, fileNameColumn)
+ // Filter after getting input file name as the filter might introduce a
join and we
+ // cannot get input file name on join's output.
+ .filter(new Column(condition))
+ .withColumn(ROW_INDEX_COL, rowIndexColumn)
+
+ val df = if (tableHasDVs) {
+ // When the table already has DVs, join the `matchedRowDf` above to
attach for each matched
+ // file its existing DeletionVectorDescriptor
+ val basePath = txn.deltaLog.dataPath.toString
+ val filePathToDV = candidateFiles.map { add =>
+ val serializedDV = Option(add.deletionVector).map(dvd =>
JsonUtils.toJson(dvd))
+ // Paths in the metadata column are canonicalized. Thus we must
canonicalize the DV path.
+ FileToDvDescriptor(
+ SparkPath.fromPath(absolutePath(basePath, add.path)).urlEncoded,
+ serializedDV)
+ }
+ val filePathToDVDf = sparkSession.createDataset(filePathToDV)
+
+ val joinExpr = filePathToDVDf("path") === matchedRowsDf(FILE_NAME_COL)
+ // Perform leftOuter join to make sure we do not eliminate any rows
because of path
+ // encoding issues. If there is such an issue we will detect it during
the aggregation
+ // of the bitmaps.
+ val joinedDf = matchedRowsDf.join(filePathToDVDf, joinExpr, "leftOuter")
+ .drop(FILE_NAME_COL)
+ .withColumnRenamed("path", FILE_NAME_COL)
+ joinedDf
+ } else {
+ // When the table has no DVs, just add a column to indicate that the
existing dv is null
+ matchedRowsDf.withColumn(FILE_DV_ID_COL, lit(null))
+ }
+
+ DeletionVectorBitmapGenerator.buildDeletionVectors(sparkSession, df,
txn.deltaLog, txn)
+ }
+}
+
+/**
+ * Holds a mapping from a file path (url-encoded) to an (optional) serialized
Deletion Vector
+ * descriptor.
+ */
+case class FileToDvDescriptor(path: String, deletionVectorId: Option[String])
+
+object FileToDvDescriptor {
+ private lazy val _encoder = new DeltaEncoder[FileToDvDescriptor]
+ implicit def encoder: Encoder[FileToDvDescriptor] = _encoder.get
+}
+
+/**
+ * Row containing the file path and its new deletion vector bitmap in memory
+ *
+ * @param filePath Absolute path of the data file this DV result
is generated for.
+ * @param deletionVectorId Existing [[DeletionVectorDescriptor]]
serialized in JSON format.
+ * This info is used to load the existing DV with
the new DV.
+ * @param deletedRowIndexSet In-memory Deletion vector bitmap generated
containing the newly
+ * deleted row indexes from data file.
+ * @param deletedRowIndexCount Count of rows marked as deleted using the
[[deletedRowIndexSet]].
+ */
+case class DeletionVectorData(
+ filePath: String,
+ deletionVectorId: Option[String],
+ deletedRowIndexSet: Array[Byte],
+ deletedRowIndexCount: Long) extends Sizing {
+
+ /** The size of the bitmaps to use in [[BinPackingIterator]]. */
+ override def size: Int = deletedRowIndexSet.length
+}
+
+object DeletionVectorData {
+ private lazy val _encoder = new DeltaEncoder[DeletionVectorData]
+ implicit def encoder: Encoder[DeletionVectorData] = _encoder.get
+
+ def apply(filePath: String, rowIndexSet: Array[Byte], rowIndexCount: Long):
DeletionVectorData = {
+ DeletionVectorData(
+ filePath = filePath,
+ deletionVectorId = None,
+ deletedRowIndexSet = rowIndexSet,
+ deletedRowIndexCount = rowIndexCount)
+ }
+}
+
+/** Final output for each file containing the file path,
DeletionVectorDescriptor and how many
+ * rows are marked as deleted in this file as part of the this operation
(doesn't include rows that
+ * are already marked as deleted).
+ *
+ * @param filePath Absolute path of the data file this DV result is
generated for.
+ * @param deletionVector Deletion vector generated containing the newly
deleted row indices from
+ * data file.
+ * @param matchedRowCount Number of rows marked as deleted using the
[[deletionVector]].
+ */
+case class DeletionVectorResult(
+ filePath: String,
+ deletionVector: DeletionVectorDescriptor,
+ matchedRowCount: Long) {
+}
+
+object DeletionVectorResult {
+ private lazy val _encoder = new DeltaEncoder[DeletionVectorResult]
+ implicit def encoder: Encoder[DeletionVectorResult] = _encoder.get
+
+ def fromDeletionVectorData(
+ data: DeletionVectorData,
+ deletionVector: DeletionVectorDescriptor): DeletionVectorResult = {
+ DeletionVectorResult(
+ filePath = data.filePath,
+ deletionVector = deletionVector,
+ matchedRowCount = data.deletedRowIndexCount)
+ }
+}
+
+case class TouchedFileWithDV(
+ inputFilePath: String,
+ fileLogEntry: AddFile,
+ newDeletionVector: DeletionVectorDescriptor,
+ deletedRows: Long) {
+ /**
+ * Checks the *sufficient* condition for a file being fully replaced by the
current operation.
+ * (That is, all rows are either being updated or deleted.)
+ */
+ def isFullyReplaced(): Boolean = {
+ fileLogEntry.numLogicalRecords match {
+ case Some(numRecords) => numRecords == numberOfModifiedRows
+ case None => false // must make defensive assumption if no statistics
are available
+ }
+ }
+
+ /**
+ * Checks if the file is unchanged by the current operation.
+ * (That is no row has been updated or deleted.)
+ */
+ def isUnchanged: Boolean = {
+ // If the bitmap is empty then no row would be removed during the rewrite,
+ // thus the file is unchanged.
+ numberOfModifiedRows == 0
+ }
+
+ /**
+ * The number of rows that are modified in this file.
+ */
+ def numberOfModifiedRows: Long = newDeletionVector.cardinality -
fileLogEntry.numDeletedRecords
+}
+
+/**
+ * Utility methods to write the deletion vector to storage. If a particular
file already
+ * has an existing DV, it will be merged with the new deletion vector and
written to storage.
+ */
+object DeletionVectorWriter extends DeltaLogging {
+ /**
+ * The context for [[createDeletionVectorMapper]] callback functions.
Contains the DV writer that
+ * is used by callback functions to write the new DVs.
+ */
+ case class DeletionVectorMapperContext(
+ dvStore: DeletionVectorStore,
+ writer: DeletionVectorStore.Writer,
+ tablePath: Path,
+ fileId: UUID,
+ prefix: String)
+
+ /**
+ * Prepare a mapper function for storing deletion vectors.
+ *
+ * For each DeletionVector the writer will create a
[[DeletionVectorMapperContext]] that contains
+ * a DV writer that is used by to write the DV into a file.
+ *
+ * The result can be used with
[[org.apache.spark.sql.Dataset.mapPartitions()]] and must thus be
+ * serialized.
+ */
+ def createDeletionVectorMapper[InputT <: Sizing, OutputT](
+ sparkSession: SparkSession,
+ hadoopConf: Configuration,
+ table: Path,
+ prefixLength: Int)
+ (callbackFn: (DeletionVectorMapperContext, InputT) => OutputT)
+ : Iterator[InputT] => Iterator[OutputT] = {
+ val broadcastHadoopConf = sparkSession.sparkContext.broadcast(
+ new SerializableConfiguration(hadoopConf))
+ // hadoop.fs.Path is not Serializable, so close over the String
representation instead
+ val tablePathString = DeletionVectorStore.pathToEscapedString(table)
+ val packingTargetSize =
+ sparkSession.conf.get(DeltaSQLConf.DELETION_VECTOR_PACKING_TARGET_SIZE)
+
+ // This is the (partition) mapper function we are returning
+ (rowIterator: Iterator[InputT]) => {
+ val dvStore =
DeletionVectorStore.createInstance(broadcastHadoopConf.value.value)
+ val tablePath = DeletionVectorStore.escapedStringToPath(tablePathString)
+ val tablePathWithFS = dvStore.pathWithFileSystem(tablePath)
+
+ val perBinFunction: Seq[InputT] => Seq[OutputT] = (rows: Seq[InputT]) =>
{
+ val prefix = DeltaUtils.getRandomPrefix(prefixLength)
+ val (writer, fileId) = createWriter(dvStore, tablePathWithFS, prefix)
+ val ctx = DeletionVectorMapperContext(
+ dvStore,
+ writer,
+ tablePath,
+ fileId,
+ prefix)
+ val result = SparkUtils.tryWithResource(writer) { writer =>
+ rows.map(r => callbackFn(ctx, r))
+ }
+ result
+ }
+
+ val binPackedRowIterator = new BinPackingIterator(rowIterator,
packingTargetSize)
+ binPackedRowIterator.flatMap(perBinFunction)
+ }
+ }
+
+ /**
+ * Creates a writer for writing multiple DVs in the same file.
+ *
+ * Returns the writer and the UUID of the new file.
+ */
+ def createWriter(
+ dvStore: DeletionVectorStore,
+ tablePath: PathWithFileSystem,
+ prefix: String = ""): (DeletionVectorStore.Writer, UUID) = {
+ val fileId = UUID.randomUUID()
+ val writer =
dvStore.createWriter(dvStore.generateFileNameInTable(tablePath, fileId, prefix))
+ (writer, fileId)
+ }
+
+ /** Store the `bitmapData` on cloud storage. */
+ def storeSerializedBitmap(
+ ctx: DeletionVectorMapperContext,
+ bitmapData: Array[Byte],
+ cardinality: Long): DeletionVectorDescriptor = {
+ if (cardinality == 0L) {
+ DeletionVectorDescriptor.EMPTY
+ } else {
+ val dvRange = ctx.writer.write(bitmapData)
+ DeletionVectorDescriptor.onDiskWithRelativePath(
+ id = ctx.fileId,
+ randomPrefix = ctx.prefix,
+ sizeInBytes = bitmapData.length,
+ cardinality = cardinality,
+ offset = Some(dvRange.offset))
+ }
+ }
+
+ /**
+ * Prepares a mapper function that can be used by DML commands to store the
Deletion Vectors
+ * that are in described in [[DeletionVectorData]] and return their
descriptors
+ * [[DeletionVectorResult]].
+ */
+ def createMapperToStoreDeletionVectors(
+ sparkSession: SparkSession,
+ hadoopConf: Configuration,
+ table: Path,
+ prefixLength: Int): Iterator[DeletionVectorData] =>
Iterator[DeletionVectorResult] =
+ createDeletionVectorMapper(sparkSession, hadoopConf, table, prefixLength) {
+ (ctx, row) => storeBitmapAndGenerateResult(ctx, row)
+ }
+
+ /**
+ * Helper to generate and store the deletion vector bitmap. The deletion
vector is merged with
+ * the file's already existing deletion vector before being stored.
+ */
+ def storeBitmapAndGenerateResult(ctx: DeletionVectorMapperContext, row:
DeletionVectorData)
+ : DeletionVectorResult = {
+ // If a group with null path exists it means there was an issue while
joining with the log to
+ // fetch the DeletionVectorDescriptors.
+ assert(row.filePath != null,
+ s"""
+ |Encountered a non matched file path.
+ |It is likely that _metadata.file_path is not encoded by Spark as
expected.
+ |""".stripMargin)
+
+ val fileDvDescriptor =
row.deletionVectorId.map(DeletionVectorDescriptor.fromJson(_))
+ val finalDvDescriptor = fileDvDescriptor match {
+ case Some(existingDvDescriptor) if row.deletedRowIndexCount > 0 =>
+ // Load the existing bit map
+ val existingBitmap =
+ StoredBitmap.create(existingDvDescriptor,
ctx.tablePath).load(ctx.dvStore)
+ val newBitmap = RoaringBitmapArray.readFrom(row.deletedRowIndexSet)
+
+ // Merge both the existing and new bitmaps into one, and finally
persist on disk
+ existingBitmap.merge(newBitmap)
+ storeSerializedBitmap(
+ ctx,
+
existingBitmap.serializeAsByteArray(RoaringBitmapArrayFormat.Portable),
+ existingBitmap.cardinality)
+ case Some(existingDvDescriptor) =>
+ existingDvDescriptor // This is already stored.
+ case None =>
+ // Persist the new bitmap
+ storeSerializedBitmap(ctx, row.deletedRowIndexSet,
row.deletedRowIndexCount)
+ }
+ DeletionVectorResult.fromDeletionVectorData(row, deletionVector =
finalDvDescriptor)
+ }
+}
+
diff --git
a/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/execution/DeletionVectorWriteTransformer.scala
b/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/execution/DeletionVectorWriteTransformer.scala
new file mode 100644
index 0000000000..2715eff63e
--- /dev/null
+++
b/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/execution/DeletionVectorWriteTransformer.scala
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution
+
+import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.execution.ValidatablePlan
+import org.apache.gluten.extension.columnar.transition.Convention
+import org.apache.gluten.vectorized.{CHNativeBlock, DeltaWriterJNIWrapper}
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference}
+import org.apache.spark.sql.delta.OptimisticTransaction
+import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor
+import org.apache.spark.sql.delta.commands.DeletionVectorResult
+import org.apache.spark.sql.delta.sources.DeltaSQLConf
+import org.apache.spark.sql.delta.storage.dv.DeletionVectorStore
+import org.apache.spark.sql.delta.util.{Codec, Utils => DeltaUtils}
+import org.apache.spark.sql.execution.datasources.CallTransformer
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.vectorized.ColumnarBatch
+import org.apache.spark.util.Utils
+
+import org.apache.hadoop.fs.Path
+
+import java.util.UUID
+import java.util.concurrent.atomic.AtomicLong
+
+case class DeletionVectorWriteTransformer(
+ child: SparkPlan,
+ table: Path,
+ deltaTxn: OptimisticTransaction)
+ extends UnaryExecNode
+ with ValidatablePlan {
+ override def output: Seq[Attribute] = Seq(
+ AttributeReference("filePath", StringType, nullable = false)(),
+ AttributeReference(
+ "deletionVector",
+ DeletionVectorWriteTransformer.deletionVectorType,
+ nullable = false
+ )(),
+ AttributeReference("matchedRowCount", LongType, nullable = false)()
+ )
+
+ override protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
+ assert(child.supportsColumnar)
+ val prefixLen = DeltaUtils.getRandomPrefixLength(deltaTxn.metadata)
+ val tablePathString = DeletionVectorStore.pathToEscapedString(table)
+ val packingTargetSize =
+ session.conf.get(DeltaSQLConf.DELETION_VECTOR_PACKING_TARGET_SIZE)
+
+ child.executeColumnar().mapPartitions {
+ blockIterator =>
+ val res = new Iterator[ColumnarBatch] {
+ var writer: Long = 0
+ override def hasNext: Boolean = {
+ blockIterator.hasNext && writer == 0
+ }
+
+ override def next(): ColumnarBatch = {
+ writer = DeltaWriterJNIWrapper
+ .createDeletionVectorWriter(tablePathString, prefixLen,
packingTargetSize)
+ while (blockIterator.hasNext) {
+ val n = blockIterator.next()
+ val block_address =
+ CHNativeBlock.fromColumnarBatch(n).blockAddress()
+ DeltaWriterJNIWrapper.deletionVectorWrite(writer, block_address)
+ }
+
+ val address =
DeltaWriterJNIWrapper.deletionVectorWriteFinalize(writer)
+ new CHNativeBlock(address).toColumnarBatch
+ }
+ }
+ res
+ }
+ }
+
+ override protected def withNewChildInternal(newChild: SparkPlan):
DeletionVectorWriteTransformer =
+ copy(child = newChild)
+
+ override def batchType(): Convention.BatchType =
BackendsApiManager.getSettings.primaryBatchType
+
+ override def rowType0(): Convention.RowType = Convention.RowType.None
+
+ override protected def doExecute(): RDD[InternalRow] = throw new
UnsupportedOperationException()
+}
+
+object DeletionVectorWriteTransformer {
+ val COUNTER = new AtomicLong(0)
+
+ private val deletionVectorType: StructType = StructType.apply(
+ Seq(
+ StructField.apply("storageType", StringType, nullable = false),
+ StructField.apply("pathOrInlineDv", StringType, nullable = false),
+ StructField.apply("offset", IntegerType, nullable = true),
+ StructField.apply("sizeInBytes", IntegerType, nullable = false),
+ StructField.apply("cardinality", LongType, nullable = false),
+ StructField.apply("maxRowIndex", LongType, nullable = true)
+ ))
+
+ def encodeUUID(uuid: String, randomPrefix: String): String = {
+ val uuidData = Codec.Base85Codec.encodeUUID(UUID.fromString(uuid))
+ // This should always be true and we are relying on it for separating out
the
+ // prefix again later without having to spend an extra character as a
separator.
+ assert(uuidData.length == 20)
+ // uuidData
+ s"$randomPrefix$uuidData"
+ }
+
+ def decodeUUID(encodedUuid: String): String = {
+ Codec.Base85Codec.decodeUUID(encodedUuid).toString
+ }
+
+ def replace(
+ aggregated: DataFrame,
+ tablePath: Path,
+ deltaTxn: OptimisticTransaction,
+ spark: SparkSession): Seq[DeletionVectorResult] = {
+ if (Utils.isTesting) {
+ COUNTER.incrementAndGet()
+ }
+
+ val queryExecution = aggregated.queryExecution
+ val new_e = DeletionVectorWriteTransformer(queryExecution.sparkPlan,
tablePath, deltaTxn)
+
+ val result = CallTransformer(spark, new_e).executedPlan.executeCollect()
+
+ def internalRowToDeletionVectorResult(row: InternalRow):
DeletionVectorResult = {
+ val filePath = row.getString(0)
+ val deletionVector = row.getStruct(1, 6)
+ val matchedRowCount = row.getLong(2)
+ val offset = if (deletionVector.isNullAt(2)) {
+ Option.empty
+ } else {
+ Some(deletionVector.getInt(2))
+ }
+
+ val maxRowIndex = if (deletionVector.isNullAt(5)) {
+ Option.empty
+ } else {
+ Some(deletionVector.getLong(5))
+ }
+
+ DeletionVectorResult(
+ filePath,
+ DeletionVectorDescriptor(
+ deletionVector.getString(0),
+ deletionVector.getString(1),
+ offset,
+ deletionVector.getInt(3),
+ deletionVector.getLong(4),
+ maxRowIndex
+ ),
+ matchedRowCount
+ )
+ }
+
+ result.map(internalRowToDeletionVectorResult).toSeq
+ }
+}
diff --git
a/backends-clickhouse/src-delta-32/test/scala/org/apache/spark/gluten/delta/GlutenDeltaParquetDeletionVectorSuite.scala
b/backends-clickhouse/src-delta-32/test/scala/org/apache/spark/gluten/delta/GlutenDeltaParquetDeletionVectorSuite.scala
index b96e02af2c..cc9fd16c83 100644
---
a/backends-clickhouse/src-delta-32/test/scala/org/apache/spark/gluten/delta/GlutenDeltaParquetDeletionVectorSuite.scala
+++
b/backends-clickhouse/src-delta-32/test/scala/org/apache/spark/gluten/delta/GlutenDeltaParquetDeletionVectorSuite.scala
@@ -21,12 +21,18 @@ import
org.apache.gluten.execution.{FileSourceScanExecTransformer, GlutenClickHo
import org.apache.spark.SparkConf
import org.apache.spark.sql.Column
import org.apache.spark.sql.catalyst.expressions.aggregation.BitmapAggregator
+import org.apache.spark.sql.delta.DeltaConfigs
+import org.apache.spark.sql.delta.DeltaLog
import org.apache.spark.sql.delta.deletionvectors.RoaringBitmapArrayFormat
import org.apache.spark.sql.delta.files.TahoeFileIndex
+import org.apache.spark.sql.delta.sources.DeltaSQLConf
+import org.apache.spark.sql.execution.DeletionVectorWriteTransformer
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec
import org.apache.spark.sql.functions.col
+import org.apache.hadoop.fs.Path
+
// Some sqls' line length exceeds 100
// scalastyle:off line.size.limit
@@ -52,7 +58,6 @@ class GlutenDeltaParquetDeletionVectorSuite
.set("spark.sql.adaptive.enabled", "true")
.set("spark.sql.files.maxPartitionBytes", "20000000")
.set("spark.sql.storeAssignmentPolicy", "legacy")
- // .setCHConfig("use_local_format", true)
.set("spark.databricks.delta.retentionDurationCheck.enabled", "false")
}
@@ -263,6 +268,117 @@ class GlutenDeltaParquetDeletionVectorSuite
}
}
+ def testBasic(prefix: Boolean = false)(deleteCallback: =>
Unit)(updateCallback: => Unit): Unit = {
+ val prefix_str = if (prefix) {
+ ", delta.randomizeFilePrefixes=true"
+ } else {
+ ""
+ }
+
+ val tableName = "dv_write_test"
+ withTable(tableName) {
+ withTempDir {
+ dirName =>
+ spark.sql(s"""
+ |CREATE TABLE IF NOT EXISTS $tableName
+ |($q1SchemaString)
+ |USING delta
+ |TBLPROPERTIES (delta.enableDeletionVectors='true'
$prefix_str)
+ |LOCATION '$dirName/$tableName'
+ |""".stripMargin)
+
+ spark.sql(s"""insert into table $tableName select * from lineitem
""".stripMargin)
+
+ spark.sql(s"""
+ | delete from $tableName
+ | where mod(l_orderkey, 3) = 1 and l_orderkey < 100
+ |""".stripMargin)
+
+ deleteCallback
+
+ val df = spark.sql(s"""select sum(l_linenumber) from $tableName
""".stripMargin)
+ val result = df.collect()
+ assertResult(1802335)(result.apply(0).get(0))
+
+ spark.sql(s"""update $tableName set l_orderkey = 1 where l_orderkey
> 0 """.stripMargin)
+
+ updateCallback
+ val df2 = spark.sql(s"""select sum(l_orderkey) from
$tableName""".stripMargin)
+ val result2 = df2.collect()
+ assertResult(600536)(result2.apply(0).get(0))
+ }
+ }
+ }
+
+ test("test delta DV write use native writer") {
+ var counter = DeletionVectorWriteTransformer.COUNTER.get()
+ testBasic() {
+ counter = DeletionVectorWriteTransformer.COUNTER.get()
+ assertResult(true)(counter > 0)
+ } {
+ assertResult(true)(DeletionVectorWriteTransformer.COUNTER.get() >
counter)
+ }
+ }
+
+ test("test delta DV write use native writer with prefix") {
+ var counter = DeletionVectorWriteTransformer.COUNTER.get()
+ testBasic(prefix = true) {
+ counter = DeletionVectorWriteTransformer.COUNTER.get()
+ assertResult(true)(counter > 0)
+ } {
+ assertResult(true)(DeletionVectorWriteTransformer.COUNTER.get() >
counter)
+ }
+ }
+
+ for (targetDVFileSize <- Seq(2, 200, 2000000)) {
+ test(
+ s"DELETE with DVs - packing multiple DVs into one file: target max DV
file " +
+ s"size=$targetDVFileSize") {
+ withSQLConf(
+ DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.defaultTablePropertyKey
-> "true",
+ DeltaSQLConf.DELETE_USE_PERSISTENT_DELETION_VECTORS.key -> "true",
+ DeltaSQLConf.DELETION_VECTOR_PACKING_TARGET_SIZE.key ->
targetDVFileSize.toString,
+ "spark.sql.shuffle.partitions" -> "1"
+ ) {
+ withTempDir {
+ dirName =>
+ // Create table with 100 files of 2 rows each.
+ val numFiles = 100
+ val path = dirName.getAbsolutePath
+ spark.range(0, 200, step = 1, numPartitions =
numFiles).write.format("delta").save(path)
+ val tableName = s"delta.`$path`"
+ val numFilesWithDVs = 10
+ val numDeletedRows = numFilesWithDVs * 1
+ spark.sql(s"DELETE FROM $tableName WHERE id % 2 = 0 AND id < 20")
+
+ // Verify the expected number of AddFiles with DVs
+ val allFiles = DeltaLog.forTable(spark,
path).unsafeVolatileSnapshot.allFiles.collect()
+ assert(allFiles.length === numFiles)
+ val addFilesWithDV = allFiles.filter(_.deletionVector != null)
+ assert(addFilesWithDV.length === numFilesWithDVs)
+ assert(addFilesWithDV.map(_.deletionVector.cardinality).sum ==
numDeletedRows)
+
+ var expectedDVFileCount = 0
+ targetDVFileSize match {
+ // Each AddFile will have its own DV file
+ case 2 => expectedDVFileCount = numFilesWithDVs
+ // Each DV size is about 34bytes according the latest format.
+ case 200 => expectedDVFileCount = numFilesWithDVs / (200 /
34).floor.toInt
+ // Expect all DVs in one file
+ case 2000000 => expectedDVFileCount = 1
+ case default =>
+ throw new IllegalStateException(s"Unknown target DV file size:
$default")
+ }
+ // Expect all DVs are written in one file
+ assert(
+ addFilesWithDV.map(_.deletionVector.absolutePath(new
Path(path))).toSet.size ===
+ expectedDVFileCount
+ )
+ }
+ }
+ }
+ }
+
test("test parquet partition table delete with the delta DV") {
withSQLConf(("spark.sql.sources.partitionOverwriteMode", "dynamic")) {
spark.sql(s"""
diff --git
a/backends-clickhouse/src-delta/main/scala/org/apache/gluten/component/CHDeltaComponent.scala
b/backends-clickhouse/src-delta/main/scala/org/apache/gluten/component/CHDeltaComponent.scala
index 1b187e4a4b..8acdcee777 100644
---
a/backends-clickhouse/src-delta/main/scala/org/apache/gluten/component/CHDeltaComponent.scala
+++
b/backends-clickhouse/src-delta/main/scala/org/apache/gluten/component/CHDeltaComponent.scala
@@ -23,11 +23,18 @@ import
org.apache.gluten.extension.columnar.validator.Validators
import org.apache.gluten.extension.injector.Injector
import org.apache.gluten.sql.shims.DeltaShimLoader
+import org.apache.spark.SparkContext
+import org.apache.spark.api.plugin.PluginContext
+
class CHDeltaComponent extends Component {
override def name(): String = "ch-delta"
override def buildInfo(): Component.BuildInfo =
Component.BuildInfo("CHDelta", "N/A", "N/A", "N/A")
override def dependencies(): Seq[Class[_ <: Component]] = classOf[CHBackend]
:: Nil
+
+ override def onDriverStart(sc: SparkContext, pc: PluginContext): Unit =
+ DeltaShimLoader.getDeltaShims.onDriverStart(sc, pc)
+
override def injectRules(injector: Injector): Unit = {
val legacy = injector.gluten.legacy
legacy.injectTransform {
diff --git
a/backends-clickhouse/src-delta/main/scala/org/apache/gluten/sql/shims/DeltaShims.scala
b/backends-clickhouse/src-delta/main/scala/org/apache/gluten/sql/shims/DeltaShims.scala
index ee263910dd..b88c8718cd 100644
---
a/backends-clickhouse/src-delta/main/scala/org/apache/gluten/sql/shims/DeltaShims.scala
+++
b/backends-clickhouse/src-delta/main/scala/org/apache/gluten/sql/shims/DeltaShims.scala
@@ -18,13 +18,13 @@ package org.apache.gluten.sql.shims
import org.apache.gluten.execution.GlutenPlan
+import org.apache.spark.SparkContext
+import org.apache.spark.api.plugin.PluginContext
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.datasources.PartitionedFile
import java.util.{HashMap => JHashMap, Map => JMap}
-sealed abstract class ShimDescriptor
-
trait DeltaShims {
def supportDeltaOptimizedWriterExec(plan: SparkPlan): Boolean = false
@@ -33,6 +33,8 @@ trait DeltaShims {
s"Can't transform ColumnarDeltaOptimizedWriterExec from
${plan.getClass.getSimpleName}")
}
+ def onDriverStart(sc: SparkContext, pc: PluginContext): Unit = {}
+
def registerExpressionExtension(): Unit = {}
def convertRowIndexFilterIdEncoded(
diff --git
a/cpp-ch/local-engine/AggregateFunctions/AggregateFunctionDVRoaringBitmap.h
b/cpp-ch/local-engine/AggregateFunctions/AggregateFunctionDVRoaringBitmap.h
index a60184a5be..de338ce6f0 100644
--- a/cpp-ch/local-engine/AggregateFunctions/AggregateFunctionDVRoaringBitmap.h
+++ b/cpp-ch/local-engine/AggregateFunctions/AggregateFunctionDVRoaringBitmap.h
@@ -36,7 +36,7 @@ struct AggregateFunctionDVRoaringBitmapData
void insertResultInto(DB::ColumnInt64 & cardinality, DB::ColumnInt64 &
last, DB::ColumnString & bitmap)
{
- cardinality.getData().push_back(roaring_bitmap_array.rb_size());
+ cardinality.getData().push_back(roaring_bitmap_array.cardinality());
auto last_value = roaring_bitmap_array.last();
if (last_value.has_value())
last.getData().push_back(last_value.value());
diff --git
a/cpp-ch/local-engine/Storages/SubstraitSource/Delta/Bitmap/DeltaDVRoaringBitmapArray.cpp
b/cpp-ch/local-engine/Storages/SubstraitSource/Delta/Bitmap/DeltaDVRoaringBitmapArray.cpp
index e238a2b362..c45d8d787d 100644
---
a/cpp-ch/local-engine/Storages/SubstraitSource/Delta/Bitmap/DeltaDVRoaringBitmapArray.cpp
+++
b/cpp-ch/local-engine/Storages/SubstraitSource/Delta/Bitmap/DeltaDVRoaringBitmapArray.cpp
@@ -17,13 +17,14 @@
#include "DeltaDVRoaringBitmapArray.h"
#include <zlib.h>
+#include <IO/ReadBufferFromString.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
+#include <Storages/SubstraitSource/ReadBufferBuilder.h>
+#include <substrait/plan.pb.h>
#include <roaring.hh>
#include <Poco/URI.h>
#include <Common/PODArray.h>
-#include <substrait/plan.pb.h>
-#include <Storages/SubstraitSource/ReadBufferBuilder.h>
namespace DB
{
@@ -82,7 +83,7 @@ void DeltaDVRoaringBitmapArray::rb_read(const String &
file_path, Int32 offset,
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Checksum
mismatch.");
}
-UInt64 DeltaDVRoaringBitmapArray::rb_size() const
+UInt64 DeltaDVRoaringBitmapArray::cardinality() const
{
UInt64 sum = 0;
for (const auto & r : roaring_bitmap_array)
@@ -137,6 +138,15 @@ void DeltaDVRoaringBitmapArray::rb_merge(const
DeltaDVRoaringBitmapArray & that)
{
rb_or(that);
}
+
+void DeltaDVRoaringBitmapArray::merge(const String & that)
+{
+ DB::ReadBufferFromString rb(that);
+ DeltaDVRoaringBitmapArray that_bitmap;
+ that_bitmap.deserialize(rb);
+ rb_merge(that_bitmap);
+}
+
void DeltaDVRoaringBitmapArray::rb_or(const DeltaDVRoaringBitmapArray & that)
{
if (roaring_bitmap_array.size() < that.roaring_bitmap_array.size())
diff --git
a/cpp-ch/local-engine/Storages/SubstraitSource/Delta/Bitmap/DeltaDVRoaringBitmapArray.h
b/cpp-ch/local-engine/Storages/SubstraitSource/Delta/Bitmap/DeltaDVRoaringBitmapArray.h
index 067ec5db32..a2d80080e1 100644
---
a/cpp-ch/local-engine/Storages/SubstraitSource/Delta/Bitmap/DeltaDVRoaringBitmapArray.h
+++
b/cpp-ch/local-engine/Storages/SubstraitSource/Delta/Bitmap/DeltaDVRoaringBitmapArray.h
@@ -24,11 +24,14 @@
namespace local_engine
{
+
+static constexpr auto DV_FILE_FORMAT_VERSION_ID_V1 = std::byte{1};
+
/**
* Roaring bitmap data.
* For a description of the roaring_bitmap_t, see:
https://github.com/RoaringBitmap/CRoaring
*/
-class DeltaDVRoaringBitmapArray : private boost::noncopyable
+class DeltaDVRoaringBitmapArray
{
static constexpr Int64 MAX_REPRESENTABLE_VALUE
= (static_cast<UInt64>(INT32_MAX - 1) << 32) |
(static_cast<UInt64>(INT32_MIN) & 0xFFFFFFFFL);
@@ -43,13 +46,14 @@ public:
explicit DeltaDVRoaringBitmapArray();
~DeltaDVRoaringBitmapArray() = default;
bool operator==(const DeltaDVRoaringBitmapArray & other) const;
- UInt64 rb_size() const;
+ UInt64 cardinality() const;
void rb_read(const String & file_path, Int32 offset, Int32 data_size,
DB::ContextPtr context);
bool rb_contains(Int64 x) const;
bool rb_is_empty() const;
void rb_clear();
void rb_add(Int64 value);
void rb_merge(const DeltaDVRoaringBitmapArray & that);
+ void merge(const String & that);
void rb_or(const DeltaDVRoaringBitmapArray & that);
String serialize() const;
void deserialize(DB::ReadBuffer & buf);
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/Delta/DeltaUtil.cpp
b/cpp-ch/local-engine/Storages/SubstraitSource/Delta/DeltaUtil.cpp
new file mode 100644
index 0000000000..725e3446a2
--- /dev/null
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/Delta/DeltaUtil.cpp
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "DeltaUtil.h"
+
+#include <Compression/CompressedReadBuffer.h>
+#include <Core/Block.h>
+#include <IO/ReadBuffer.h>
+#include <jni/jni_common.h>
+#include <Common/JNIUtils.h>
+
+namespace local_engine::delta
+{
+
+jclass DeltaUtil::delta_jni_class = nullptr;
+jmethodID DeltaUtil::delta_jni_encode_uuid = nullptr;
+jmethodID DeltaUtil::delta_jni_decode_uuid = nullptr;
+
+void DeltaUtil::initJavaCallerReference(JNIEnv * env)
+{
+ delta_jni_class = CreateGlobalClassReference(env,
"Lorg/apache/gluten/vectorized/DeltaWriterJNIWrapper;");
+ delta_jni_encode_uuid
+ = GetStaticMethodID(env, delta_jni_class, "encodeUUID",
"(Ljava/lang/String;Ljava/lang/String;)Ljava/lang/String;");
+ delta_jni_decode_uuid = GetStaticMethodID(env, delta_jni_class,
"decodeUUID", "(Ljava/lang/String;)Ljava/lang/String;");
+}
+
+void DeltaUtil::releaseJavaCallerReference(JNIEnv * env)
+{
+ if (delta_jni_class)
+ env->DeleteGlobalRef(delta_jni_class);
+}
+
+String DeltaUtil::encodeUUID(String uuid, String prefix)
+{
+ GET_JNIENV(env)
+ jstring jUuid = env->NewStringUTF(uuid.c_str());
+ jstring jRandomPrefix = env->NewStringUTF(prefix.c_str());
+
+ // Call the static Java method
+ jstring jResult = (jstring)env->CallStaticObjectMethod(delta_jni_class,
delta_jni_encode_uuid, jUuid, jRandomPrefix);
+ const char * resultCStr = env->GetStringUTFChars(jResult, nullptr);
+ std::string encode(resultCStr);
+ env->ReleaseStringUTFChars(jResult, resultCStr);
+ env->DeleteLocalRef(jUuid);
+ env->DeleteLocalRef(jRandomPrefix);
+ env->DeleteLocalRef(jResult);
+ CLEAN_JNIENV
+ return encode;
+}
+
+String DeltaUtil::decodeUUID(String encodedUuid)
+{
+ GET_JNIENV(env)
+ jstring j_encoded_uuid = env->NewStringUTF(encodedUuid.c_str());
+ jstring jResult = (jstring)env->CallStaticObjectMethod(delta_jni_class,
delta_jni_decode_uuid, j_encoded_uuid);
+ const char * resultCStr = env->GetStringUTFChars(jResult, nullptr);
+ std::string decode_uuid(resultCStr);
+
+ env->ReleaseStringUTFChars(jResult, resultCStr);
+ env->DeleteLocalRef(j_encoded_uuid);
+ env->DeleteLocalRef(jResult);
+ CLEAN_JNIENV
+ return decode_uuid;
+}
+
+}
\ No newline at end of file
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/Delta/DeltaUtil.h
b/cpp-ch/local-engine/Storages/SubstraitSource/Delta/DeltaUtil.h
new file mode 100644
index 0000000000..d4f80b7d05
--- /dev/null
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/Delta/DeltaUtil.h
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <jni.h>
+#include <jni/jni_common.h>
+
+
+namespace local_engine::delta
+{
+struct Codec
+{
+ struct Base85Codec
+ {
+ static constexpr Int32 ENCODED_UUID_LENGTH = 20;
+ };
+};
+
+
+static constexpr String UUID_DV_MARKER = "u";
+
+class DeltaUtil
+{
+public:
+ static void initJavaCallerReference(JNIEnv * env);
+ static void releaseJavaCallerReference(JNIEnv * env);
+
+ static String encodeUUID(String uuid, String prefix);
+ static String decodeUUID(String encodedUuid);
+
+private:
+ static jclass delta_jni_class;
+ static jmethodID delta_jni_encode_uuid;
+ static jmethodID delta_jni_decode_uuid;
+};
+
+}
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/Delta/DeltaWriter.cpp
b/cpp-ch/local-engine/Storages/SubstraitSource/Delta/DeltaWriter.cpp
new file mode 100644
index 0000000000..926c3ab215
--- /dev/null
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/Delta/DeltaWriter.cpp
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "DeltaWriter.h"
+
+#include <zlib.h>
+#include <Columns/ColumnNullable.h>
+#include <Columns/ColumnString.h>
+#include <Columns/ColumnsNumber.h>
+#include <Columns/IColumn.h>
+#include <Core/Block_fwd.h>
+#include <Core/Range.h>
+#include <DataTypes/DataTypeNullable.h>
+#include <DataTypes/DataTypeTuple.h>
+#include <DataTypes/DataTypesNumber.h>
+#include <Formats/JSONUtils.h>
+#include <IO/WriteBuffer.h>
+#include <IO/WriteHelpers.h>
+#include <Storages/Output/WriteBufferBuilder.h>
+#include <Storages/SubstraitSource/Delta/DeltaUtil.h>
+#include <rapidjson/document.h>
+#include <Poco/URI.h>
+
+namespace local_engine::delta
+{
+
+String getRandomPrefix(const size_t & length)
+{
+ static const char alphanum[] = "0123456789"
+ "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
+ "abcdefghijklmnopqrstuvwxyz";
+
+ String res;
+ for (size_t i = 0; i < length; ++i)
+ res += alphanum[rand() % (sizeof(alphanum) - 1)];
+ return res;
+}
+
+DB::DataTypePtr getDeletionVectorType()
+{
+ DB::DataTypes dv_descriptor_types;
+ auto storageType = std::make_shared<DB::DataTypeString>();
+ auto pathOrInlineDv = std::make_shared<DB::DataTypeString>();
+ auto offset = std::make_shared<DB::DataTypeInt32>();
+ auto offset_nullable = std::make_shared<DB::DataTypeNullable>(offset);
+ auto sizeInBytes = std::make_shared<DB::DataTypeInt32>();
+ auto cardinality = std::make_shared<DB::DataTypeInt64>();
+ auto maxRowIndex = std::make_shared<DB::DataTypeInt64>();
+ auto maxRowIndex_nullable =
std::make_shared<DB::DataTypeNullable>(maxRowIndex);
+
+ dv_descriptor_types.emplace_back(storageType);
+ dv_descriptor_types.emplace_back(pathOrInlineDv);
+ dv_descriptor_types.emplace_back(offset_nullable);
+ dv_descriptor_types.emplace_back(sizeInBytes);
+ dv_descriptor_types.emplace_back(cardinality);
+ dv_descriptor_types.emplace_back(maxRowIndex_nullable);
+
+ DB::Strings dv_descriptor_names;
+ dv_descriptor_names.emplace_back("storageType");
+ dv_descriptor_names.emplace_back("pathOrInlineDv");
+ dv_descriptor_names.emplace_back("offset");
+ dv_descriptor_names.emplace_back("sizeInBytes");
+ dv_descriptor_names.emplace_back("cardinality");
+ dv_descriptor_names.emplace_back("maxRowIndex");
+
+ return std::make_shared<DB::DataTypeTuple>(dv_descriptor_types,
dv_descriptor_names);
+}
+
+
+void DeltaWriter::writeDeletionVector(const DB::Block & block)
+{
+ const auto & file_path_columns = block.getByPosition(0);
+ const auto & deletion_vector_id_columns = block.getByPosition(1);
+ const auto & bitmap_columns = block.getByPosition(2);
+ const auto & cardinality_src_columns = block.getByPosition(3);
+
+ for (size_t row_idx = 0; row_idx < block.rows(); row_idx++)
+ {
+ const auto file_path = file_path_columns.column->getDataAt(row_idx);
+ auto bitmap = bitmap_columns.column->getDataAt(row_idx);
+ auto cardinality = cardinality_src_columns.column->get64(row_idx); //
alisa deletedRowIndexCount
+
+ if (size_of_current_bin > 0 && bitmap.size + size_of_current_bin >
packing_target_size)
+ {
+ write_buffer->finalize();
+ write_buffer = nullptr;
+ }
+
+ if (!deletion_vector_id_columns.column->isNullAt(row_idx))
+ {
+ DB::Field deletion_vector_id_field;
+ deletion_vector_id_columns.column->get(row_idx,
deletion_vector_id_field);
+ auto existing_deletion_vector_id =
deletion_vector_id_field.safeGet<String>();
+
+ if (!existing_deletion_vector_id.empty())
+ {
+ rapidjson::Document existingDvDescriptor;
+
existingDvDescriptor.Parse(existing_deletion_vector_id.c_str());
+
+ String existing_path_or_inline_dv =
existingDvDescriptor["pathOrInlineDv"].GetString();
+ Int32 existing_offset =
existingDvDescriptor["offset"].GetInt();
+ Int32 existing_size_in_bytes =
existingDvDescriptor["sizeInBytes"].GetInt();
+ Int64 existing_cardinality =
existingDvDescriptor["cardinality"].GetInt64();
+
+ if (cardinality > 0)
+ {
+ DeltaDVRoaringBitmapArray existing_bitmap
+ =
deserializeExistingBitmap(existing_path_or_inline_dv, existing_offset,
existing_size_in_bytes, table_path);
+ existing_bitmap.merge(bitmap.toString());
+ bitmap = existing_bitmap.serialize();
+ cardinality = existing_bitmap.cardinality();
+ }
+ else
+ {
+ // use already existing deletion vector
+ auto dv_descriptor_field =
createDeletionVectorDescriptorField(
+ existing_path_or_inline_dv, existing_offset,
existing_size_in_bytes, existing_cardinality);
+ file_path_column->insert(file_path.data);
+ dv_descriptor_column->insert(dv_descriptor_field);
+ matched_row_count_col->insert(cardinality);
+ continue;
+ }
+ }
+ }
+
+ if (!write_buffer)
+ initBinPackage();
+
+ size_of_current_bin = size_of_current_bin + bitmap.size;
+ Int32 bitmap_size = static_cast<Int32>(bitmap.size);
+ DB::writeBinaryBigEndian(bitmap_size, *write_buffer);
+
+ write_buffer->write(bitmap.data, bitmap.size);
+ Int32 checksum_value = static_cast<Int32>(crc32_z(0L,
reinterpret_cast<const unsigned char *>(bitmap.data), bitmap_size));
+ DB::writeBinaryBigEndian(checksum_value, *write_buffer);
+
+ auto dv_descriptor_field
+ = createDeletionVectorDescriptorField(DeltaUtil::encodeUUID(uuid,
prefix), offset, bitmap_size, cardinality);
+
+ file_path_column->insert(file_path.data);
+ dv_descriptor_column->insert(dv_descriptor_field);
+ matched_row_count_col->insert(cardinality);
+
+ offset = sizeof(bitmap_size) + bitmap_size + sizeof(checksum_value) +
offset;
+ }
+}
+
+DB::Block * DeltaWriter::finalize()
+{
+ if (size_of_current_bin > 0)
+ write_buffer->finalize();
+
+ DB::Block * res = new DB::Block(
+ {DB::ColumnWithTypeAndName(std::move(file_path_column),
std::make_shared<DB::DataTypeString>(), "filePath"),
+ DB::ColumnWithTypeAndName(std::move(dv_descriptor_column),
getDeletionVectorType(), "deletionVector"),
+ DB::ColumnWithTypeAndName(std::move(matched_row_count_col),
std::make_shared<DB::DataTypeInt64>(), "matchedRowCount")});
+
+ return res;
+}
+
+
+DB::ColumnTuple::MutablePtr DeltaWriter::createDeletionVectorDescriptorColumn()
+{
+ DB::MutableColumns dv_descriptor_mutable_columns;
+ dv_descriptor_mutable_columns.emplace_back(DB::ColumnString::create()); //
storageType
+ dv_descriptor_mutable_columns.emplace_back(DB::ColumnString::create()); //
pathOrInlineDv
+
dv_descriptor_mutable_columns.emplace_back(DB::ColumnNullable::create(DB::ColumnInt32::create(),
DB::ColumnUInt8::create())); // offset
+ dv_descriptor_mutable_columns.emplace_back(DB::ColumnInt32::create()); //
sizeInBytes
+ dv_descriptor_mutable_columns.emplace_back(DB::ColumnInt64::create()); //
cardinality
+ dv_descriptor_mutable_columns.emplace_back(
+ DB::ColumnNullable::create(DB::ColumnInt64::create(),
DB::ColumnUInt8::create())); // maxRowIndex
+
+ return DB::ColumnTuple::create(std::move(dv_descriptor_mutable_columns));
+}
+
+String DeltaWriter::assembleDeletionVectorPath(const String & table_path,
const String & prefix, const String & uuid) const
+{
+ String path = table_path + "/";
+ if (!prefix.empty())
+ path += prefix + "/";
+
+ path += DELETION_VECTOR_FILE_NAME_CORE + "_" + uuid + ".bin";
+ return path;
+}
+
+std::unique_ptr<DB::WriteBuffer> DeltaWriter::createWriteBuffer(const String &
table_path, const String & prefix, const String & uuid) const
+{
+ String dv_file = assembleDeletionVectorPath(table_path, prefix, uuid);
+
+ std::string encoded;
+ Poco::URI::encode(dv_file, "", encoded);
+ const Poco::URI poco_uri(encoded);
+ const auto write_buffer_builder =
WriteBufferBuilderFactory::instance().createBuilder(poco_uri.getScheme(),
context);
+ return write_buffer_builder->build(poco_uri.toString());
+}
+
+DeltaDVRoaringBitmapArray DeltaWriter::deserializeExistingBitmap(
+ const String & existing_path_or_inline_dv,
+ const Int32 & existing_offset,
+ const Int32 & existing_size_in_bytes,
+ const String & table_path) const
+{
+ const auto random_prefix_length = existing_path_or_inline_dv.length() -
Codec::Base85Codec::ENCODED_UUID_LENGTH;
+ const auto randomPrefix = existing_path_or_inline_dv.substr(0,
random_prefix_length);
+ const auto encoded_uuid =
existing_path_or_inline_dv.substr(random_prefix_length);
+ const auto existing_decode_uuid = DeltaUtil::decodeUUID(encoded_uuid);
+ const String existing_dv_file = assembleDeletionVectorPath(table_path,
randomPrefix, existing_decode_uuid);
+ DeltaDVRoaringBitmapArray existing_bitmap;
+ existing_bitmap.rb_read(existing_dv_file, existing_offset,
existing_size_in_bytes, context);
+ return existing_bitmap;
+}
+
+DB::Tuple DeltaWriter::createDeletionVectorDescriptorField(
+ const String & path_or_inline_dv, const Int32 & offset, const Int32 &
size_in_bytes, const Int64 & cardinality)
+{
+ DB::Tuple tuple;
+ tuple.emplace_back(UUID_DV_MARKER); // storageType
+ tuple.emplace_back(path_or_inline_dv); // pathOrInlineDv
+ tuple.emplace_back(offset); // offset
+ tuple.emplace_back(size_in_bytes); // sizeInBytes
+ tuple.emplace_back(cardinality); // cardinality
+ tuple.emplace_back(DB::Field{}); // maxRowIndex
+ return tuple;
+}
+
+void DeltaWriter::initBinPackage()
+{
+ offset = 0;
+ size_of_current_bin = 0;
+ prefix = getRandomPrefix(prefix_length);
+ uuid = DB::toString(DB::UUIDHelpers::generateV4());
+ write_buffer = createWriteBuffer(table_path, prefix, uuid);
+ DB::writeIntBinary(DV_FILE_FORMAT_VERSION_ID_V1, *write_buffer);
+ offset++;
+}
+
+
+}
\ No newline at end of file
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/Delta/DeltaWriter.h
b/cpp-ch/local-engine/Storages/SubstraitSource/Delta/DeltaWriter.h
new file mode 100644
index 0000000000..f6d7372ff4
--- /dev/null
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/Delta/DeltaWriter.h
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <Columns/ColumnString.h>
+#include <Columns/ColumnTuple.h>
+#include <Columns/ColumnsNumber.h>
+#include <Core/Block.h>
+#include <Core/Field.h>
+#include <Interpreters/Context_fwd.h>
+#include <Storages/SubstraitSource/Delta/Bitmap/DeltaDVRoaringBitmapArray.h>
+
+namespace local_engine::delta
+{
+
+struct DeletionVectorDescriptor
+{
+};
+
+class DeltaWriter
+{
+ static constexpr String UUID_DV_MARKER = "u";
+ static constexpr String DELETION_VECTOR_FILE_NAME_CORE = "deletion_vector";
+
+public:
+ explicit DeltaWriter(
+ const DB::ContextPtr & context_, const String & table_path_, const
size_t & prefix_length_, const size_t & packing_target_size_)
+ : context(context_), table_path(table_path_),
prefix_length(prefix_length_), packing_target_size(packing_target_size_)
+ {
+ file_path_column = DB::ColumnString::create();
+ dv_descriptor_column = createDeletionVectorDescriptorColumn();
+ matched_row_count_col = DB::ColumnInt64::create();
+ }
+ void
+ writeDeletionVector(const DB::Block & block);
+
+ DB::Block * finalize();
+
+private:
+ DB::ColumnTuple::MutablePtr createDeletionVectorDescriptorColumn();
+ String assembleDeletionVectorPath(const String & table_path, const String
& prefix, const String & uuid) const;
+ std::unique_ptr<DB::WriteBuffer> createWriteBuffer(const String &
table_path, const String & prefix, const String & uuid) const;
+ DeltaDVRoaringBitmapArray deserializeExistingBitmap(
+ const String & existing_path_or_inline_dv,
+ const Int32 & existing_offset,
+ const Int32 & existing_size_in_bytes,
+ const String & table_path) const;
+ DB::Tuple createDeletionVectorDescriptorField(
+ const String & path_or_inline_dv, const Int32 & offset, const Int32 &
size_in_bytes, const Int64 & cardinality);
+
+ void initBinPackage();
+
+ const DB::ContextPtr & context;
+ const String table_path;
+ const size_t prefix_length;
+ const size_t packing_target_size;
+
+ DB::MutableColumnPtr file_path_column;
+ DB::MutableColumnPtr dv_descriptor_column;
+ DB::MutableColumnPtr matched_row_count_col;
+ std::unique_ptr<DB::WriteBuffer> write_buffer;
+
+ size_t offset = 0;
+ size_t size_of_current_bin = 0;
+ String prefix;
+ String uuid;
+};
+
+
+}
diff --git a/cpp-ch/local-engine/local_engine_jni.cpp
b/cpp-ch/local-engine/local_engine_jni.cpp
index a4da80ae93..e3ac674360 100644
--- a/cpp-ch/local-engine/local_engine_jni.cpp
+++ b/cpp-ch/local-engine/local_engine_jni.cpp
@@ -46,6 +46,8 @@
#include <Storages/MergeTree/StorageMergeTreeFactory.h>
#include <Storages/Output/BlockStripeSplitter.h>
#include <Storages/Output/NormalFileWriter.h>
+#include <Storages/SubstraitSource/Delta/DeltaWriter.h>
+#include <Storages/SubstraitSource/Delta/DeltaUtil.h>
#include <Storages/SubstraitSource/ReadBufferBuilder.h>
#include <jni/SharedPointerWrapper.h>
#include <jni/jni_common.h>
@@ -198,6 +200,7 @@ JNIEXPORT void
Java_org_apache_gluten_vectorized_ExpressionEvaluatorJniWrapper_n
local_engine::BroadCastJoinBuilder::destroy(env);
local_engine::SparkMergeTreeWriterJNI::destroy(env);
local_engine::SparkRowInfoJNI::destroy(env);
+ local_engine::delta::DeltaUtil::releaseJavaCallerReference(env);
env->DeleteGlobalRef(block_stripes_class);
env->DeleteGlobalRef(split_result_class);
@@ -1334,6 +1337,46 @@ JNIEXPORT void
Java_org_apache_gluten_execution_CHNativeCacheManager_removeFiles
LOCAL_ENGINE_JNI_METHOD_END(env, );
}
+JNIEXPORT jlong
Java_org_apache_gluten_vectorized_DeltaWriterJNIWrapper_createDeletionVectorWriter(
+ JNIEnv * env, jclass, jstring table_path_, jint prefix_length_, jlong
packingTargetSize_)
+{
+ LOCAL_ENGINE_JNI_METHOD_START
+ auto table_path = jstring2string(env, table_path_);
+
+ const auto query_context =
local_engine::QueryContext::instance().currentQueryContext();
+ auto writer = new local_engine::delta::DeltaWriter(query_context,
table_path, prefix_length_, packingTargetSize_);
+ return reinterpret_cast<jlong>(writer);
+ LOCAL_ENGINE_JNI_METHOD_END(env, -1);
+}
+
+JNIEXPORT void
Java_org_apache_gluten_vectorized_DeltaWriterJNIWrapper_deletionVectorWrite(
+ JNIEnv * env, jclass, jlong writer_address_, jlong blockAddress)
+{
+ LOCAL_ENGINE_JNI_METHOD_START
+ const auto * block = reinterpret_cast<DB::Block *>(blockAddress);
+ auto * writer = reinterpret_cast<local_engine::delta::DeltaWriter
*>(writer_address_);
+ writer->writeDeletionVector(*block);
+ LOCAL_ENGINE_JNI_METHOD_END(env, );
+}
+
+JNIEXPORT jlong
+Java_org_apache_gluten_vectorized_DeltaWriterJNIWrapper_deletionVectorWriteFinalize(JNIEnv
* env, jclass, jlong writer_address_)
+{
+ LOCAL_ENGINE_JNI_METHOD_START
+ auto * writer = reinterpret_cast<local_engine::delta::DeltaWriter
*>(writer_address_);
+ auto * column_batch = writer->finalize();
+ delete writer;
+ return reinterpret_cast<UInt64>(column_batch);
+ LOCAL_ENGINE_JNI_METHOD_END(env, -1);
+}
+
+JNIEXPORT void
Java_org_apache_gluten_vectorized_DeltaWriterJNIWrapper_registerNativeReference(JNIEnv
* env, jclass)
+{
+ LOCAL_ENGINE_JNI_METHOD_START
+ local_engine::delta::DeltaUtil::initJavaCallerReference(env);
+ LOCAL_ENGINE_JNI_METHOD_END(env, );
+}
+
#ifdef __cplusplus
}
diff --git a/cpp-ch/local-engine/tests/gtest_clickhouse_roaring_bitmap.cpp
b/cpp-ch/local-engine/tests/gtest_clickhouse_roaring_bitmap.cpp
index 3dd13c9ccd..d5194c9f01 100644
--- a/cpp-ch/local-engine/tests/gtest_clickhouse_roaring_bitmap.cpp
+++ b/cpp-ch/local-engine/tests/gtest_clickhouse_roaring_bitmap.cpp
@@ -176,7 +176,7 @@ TEST(Delta_DV, DeltaDVRoaringBitmapArray)
DeltaDVRoaringBitmapArray bitmap_array2{};
bitmap_array2.rb_read(file_uri2, 1, 4047, context);
EXPECT_FALSE(bitmap_array2.rb_is_empty());
- EXPECT_EQ(2098, bitmap_array2.rb_size());
+ EXPECT_EQ(2098, bitmap_array2.cardinality());
EXPECT_TRUE(bitmap_array2.rb_contains(0));
EXPECT_TRUE(bitmap_array2.rb_contains(1003));
EXPECT_TRUE(bitmap_array2.rb_contains(880));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]