This is an automated email from the ASF dual-hosted git repository.
zhztheplayer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 97f06b41d0 [VL][Delta] Add DV scan info extraction utility (#12197)
97f06b41d0 is described below
commit 97f06b41d07104455388ac764a075166e74f33e3
Author: Mohammad Linjawi <[email protected]>
AuthorDate: Wed Jun 3 16:12:03 2026 +0300
[VL][Delta] Add DV scan info extraction utility (#12197)
* [VL][Delta] Add DV scan info extraction utility
* [VL][Delta] Hide DV descriptor from scan info API
* [VL][Delta] Reorder DV scan info fields
---------
Co-authored-by: Mohammad Linjawi <[email protected]>
---
.../gluten/delta/DeltaDeletionVectorScanInfo.scala | 207 ++++++++++++++++++++
.../delta/DeltaDeletionVectorScanInfoSuite.scala | 154 +++++++++++++++
.../gluten/delta/DeltaDeletionVectorScanInfo.scala | 217 +++++++++++++++++++++
.../delta/DeltaDeletionVectorScanInfoSuite.scala | 154 +++++++++++++++
4 files changed, 732 insertions(+)
diff --git
a/backends-velox/src-delta33/main/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfo.scala
b/backends-velox/src-delta33/main/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfo.scala
new file mode 100644
index 0000000000..8ada3d755c
--- /dev/null
+++
b/backends-velox/src-delta33/main/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfo.scala
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.delta
+
+import org.apache.gluten.sql.shims.SparkShimLoader
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.delta.GlutenDeltaParquetFileFormat
+import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor
+import org.apache.spark.sql.delta.deletionvectors.{RoaringBitmapArrayFormat,
StoredBitmap}
+import org.apache.spark.sql.delta.storage.dv.HadoopFileSystemDVStore
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+
+import org.apache.hadoop.fs.Path
+
+import java.util.{ArrayList => JArrayList}
+
+import scala.collection.JavaConverters._
+import scala.util.control.NonFatal
+
+object DeltaDeletionVectorScanInfo {
+ object RowIndexFilterType extends Enumeration {
+ type RowIndexFilterType = Value
+ val KEEP_ALL, IF_CONTAINED, IF_NOT_CONTAINED = Value
+ }
+
+ import RowIndexFilterType._
+
+ final case class DeletionVectorInfo(
+ hasDeletionVector: Boolean,
+ rowIndexFilterType: RowIndexFilterType,
+ cardinality: Long,
+ serializedDeletionVector: Array[Byte])
+
+ final case class PartitionFileScanInfo(
+ normalizedOtherMetadataColumns: Map[String, Object],
+ deletionVectorInfo: DeletionVectorInfo)
+
+ private val RowIndexFilterIdEncoded =
+ GlutenDeltaParquetFileFormat.FILE_ROW_INDEX_FILTER_ID_ENCODED
+ private val RowIndexFilterTypeKey =
+ GlutenDeltaParquetFileFormat.FILE_ROW_INDEX_FILTER_TYPE
+
+ def extract(
+ spark: SparkSession,
+ partitionColumnCount: Int,
+ file: PartitionedFile): PartitionFileScanInfo = {
+ val metadata = otherMetadataColumns(file)
+ val normalizedMetadata = metadata -- Seq(RowIndexFilterIdEncoded,
RowIndexFilterTypeKey)
+ val dvInfo = extractDeletionVectorInfo(spark, partitionColumnCount, file,
metadata)
+ PartitionFileScanInfo(normalizedMetadata, dvInfo)
+ }
+
+ def extractAll(
+ spark: SparkSession,
+ partitionColumnCount: Int,
+ files: Seq[PartitionedFile]): Seq[PartitionFileScanInfo] = {
+ files.map(extract(spark, partitionColumnCount, _))
+ }
+
+ def extractAllFromJava(
+ spark: SparkSession,
+ partitionColumnCount: Int,
+ files: java.util.List[PartitionedFile]):
java.util.List[PartitionFileScanInfo] = {
+ new JArrayList(extractAll(spark, partitionColumnCount,
files.asScala.toSeq).asJava)
+ }
+
+ private def extractDeletionVectorInfo(
+ spark: SparkSession,
+ partitionColumnCount: Int,
+ file: PartitionedFile,
+ metadata: Map[String, Object]): DeletionVectorInfo = {
+ val descriptorValue = metadata.get(RowIndexFilterIdEncoded)
+ val filterTypeValue = metadata.get(RowIndexFilterTypeKey)
+
+ (descriptorValue, filterTypeValue) match {
+ case (None, None) =>
+ DeletionVectorInfo(false, KEEP_ALL, 0L, Array.emptyByteArray)
+ case (Some(encodedDescriptor), Some(filterType)) =>
+ val descriptor = parseDescriptor(encodedDescriptor.toString)
+ val serializedPayload = serializePayload(spark, partitionColumnCount,
file, descriptor)
+ DeletionVectorInfo(
+ true,
+ parseRowIndexFilterType(filterType.toString),
+ descriptor.cardinality,
+ serializedPayload)
+ case _ =>
+ throw new IllegalStateException(
+ s"Both $RowIndexFilterIdEncoded and $RowIndexFilterTypeKey must
either be present or absent")
+ }
+ }
+
+ private def otherMetadataColumns(file: PartitionedFile): Map[String, Object]
= {
+ val otherMetadata =
+ SparkShimLoader.getSparkShims.getOtherConstantMetadataColumnValues(file)
+ if (otherMetadata == null) {
+ Map.empty
+ } else {
+ otherMetadata.asScala.toMap
+ }
+ }
+
+ private def parseDescriptor(encodedDescriptor: String):
DeletionVectorDescriptor = {
+ try {
+ DeletionVectorDescriptor.deserializeFromBase64(encodedDescriptor)
+ } catch {
+ case NonFatal(e) =>
+ throw new IllegalArgumentException("Unable to parse Delta deletion
vector descriptor", e)
+ }
+ }
+
+ private def parseRowIndexFilterType(filterType: String): RowIndexFilterType
= {
+ filterType match {
+ case "IF_CONTAINED" => IF_CONTAINED
+ case "IF_NOT_CONTAINED" => IF_NOT_CONTAINED
+ case "KEEP_ALL" => KEEP_ALL
+ case unexpected =>
+ throw new IllegalStateException(s"Unexpected row index filter type:
$unexpected")
+ }
+ }
+
+ private def serializePayload(
+ spark: SparkSession,
+ partitionColumnCount: Int,
+ file: PartitionedFile,
+ descriptor: DeletionVectorDescriptor): Array[Byte] = {
+ val tablePath = resolveTablePath(spark, partitionColumnCount, file)
+ if (tablePath == null) {
+ throw new IllegalStateException(
+ "Unable to resolve Delta table path while materializing deletion
vector payload")
+ }
+ val dvStore = new
HadoopFileSystemDVStore(spark.sessionState.newHadoopConf())
+ StoredBitmap
+ .create(descriptor, tablePath)
+ .load(dvStore)
+ .serializeAsByteArray(RoaringBitmapArrayFormat.Portable)
+ }
+
+ private def resolveTablePath(
+ spark: SparkSession,
+ partitionColumnCount: Int,
+ file: PartitionedFile): Path = {
+ val fileParent = new
Path(unescapePathName(file.filePath.toString)).getParent
+ var tablePath = fileParent
+ for (_ <- 0 until partitionColumnCount) {
+ tablePath = tablePath.getParent
+ }
+ if (tablePath != null && isDeltaTablePath(spark, tablePath)) {
+ return tablePath
+ }
+
+ var candidate = fileParent
+ while (candidate != null && !isDeltaTablePath(spark, candidate)) {
+ candidate = candidate.getParent
+ }
+ if (candidate != null) candidate else tablePath
+ }
+
+ private def isDeltaTablePath(spark: SparkSession, tablePath: Path): Boolean
= {
+ val deltaLogPath = new Path(tablePath, "_delta_log")
+ try {
+
deltaLogPath.getFileSystem(spark.sessionState.newHadoopConf()).exists(deltaLogPath)
+ } catch {
+ case NonFatal(_) => false
+ }
+ }
+
+ private def unescapePathName(path: String): String = {
+ if (path == null || path.indexOf('%') < 0) {
+ path
+ } else {
+ val builder = new StringBuilder(path.length)
+ var index = 0
+ while (index < path.length) {
+ if (path.charAt(index) == '%' && index + 2 < path.length) {
+ val high = Character.digit(path.charAt(index + 1), 16)
+ val low = Character.digit(path.charAt(index + 2), 16)
+ if (high >= 0 && low >= 0) {
+ builder.append(((high << 4) | low).toChar)
+ index += 3
+ } else {
+ builder.append(path.charAt(index))
+ index += 1
+ }
+ } else {
+ builder.append(path.charAt(index))
+ index += 1
+ }
+ }
+ builder.toString()
+ }
+ }
+}
diff --git
a/backends-velox/src-delta33/test/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfoSuite.scala
b/backends-velox/src-delta33/test/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfoSuite.scala
new file mode 100644
index 0000000000..cbb70817ba
--- /dev/null
+++
b/backends-velox/src-delta33/test/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfoSuite.scala
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.delta
+
+import org.apache.gluten.delta.DeltaDeletionVectorScanInfo.RowIndexFilterType
+
+import org.apache.spark.SparkConf
+import org.apache.spark.paths.SparkPath
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.delta.{DeltaLog, GlutenDeltaParquetFileFormat}
+import org.apache.spark.sql.delta.catalog.DeltaCatalog
+import org.apache.spark.sql.delta.test.DeltaSQLTestUtils
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.tags.ExtendedSQLTest
+
+import io.delta.sql.DeltaSparkSessionExtension
+import org.apache.hadoop.fs.Path
+
+@ExtendedSQLTest
+class DeltaDeletionVectorScanInfoSuite
+ extends QueryTest
+ with SharedSparkSession
+ with DeltaSQLTestUtils {
+
+ import testImplicits._
+
+ override protected def sparkConf: SparkConf = {
+ super.sparkConf
+ .set(StaticSQLConf.SPARK_SESSION_EXTENSIONS.key,
classOf[DeltaSparkSessionExtension].getName)
+ .set(SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION.key,
classOf[DeltaCatalog].getName)
+ .set("spark.databricks.delta.snapshotPartitions", "2")
+ }
+
+ test("extracts essential Delta DV scan info from split metadata") {
+ withTempDir {
+ tempDir =>
+ val path = tempDir.getCanonicalPath
+ Seq((1, "a"), (2, "b"), (3, "c"), (4, "d"))
+ .toDF("id", "value")
+ .coalesce(1)
+ .write
+ .format("delta")
+ .save(path)
+
+ spark.sql(
+ s"ALTER TABLE delta.`$path` SET TBLPROPERTIES
('delta.enableDeletionVectors' = true)")
+ spark.sql(s"DELETE FROM delta.`$path` WHERE id IN (3, 4)")
+
+ val dataFile = DeltaLog
+ .forTable(spark, new Path(path))
+ .update()
+ .allFiles
+ .collect()
+ .find(_.deletionVector != null)
+ .get
+ val partitionedFile = partitionedFileWithMetadata(
+ path,
+ dataFile.path,
+ dataFile.size,
+ Map(
+ GlutenDeltaParquetFileFormat.FILE_ROW_INDEX_FILTER_ID_ENCODED ->
+ dataFile.deletionVector.serializeToBase64(),
+ GlutenDeltaParquetFileFormat.FILE_ROW_INDEX_FILTER_TYPE ->
"IF_CONTAINED",
+ "kept_key" -> "kept_value"
+ )
+ )
+
+ val scanInfo = DeltaDeletionVectorScanInfo.extract(spark, 0,
partitionedFile)
+ val dvInfo = scanInfo.deletionVectorInfo
+
+ assert(dvInfo.hasDeletionVector)
+ assert(dvInfo.rowIndexFilterType == RowIndexFilterType.IF_CONTAINED)
+ assert(dvInfo.cardinality == dataFile.deletionVector.cardinality)
+ assert(dvInfo.serializedDeletionVector.nonEmpty)
+ assert(scanInfo.normalizedOtherMetadataColumns == Map("kept_key" ->
"kept_value"))
+ }
+ }
+
+ test("returns keep-all scan info when Delta DV metadata is absent") {
+ withTempDir {
+ tempDir =>
+ val path = tempDir.getCanonicalPath
+ Seq((1, "a")).toDF("id",
"value").coalesce(1).write.format("delta").save(path)
+
+ val dataFile = DeltaLog.forTable(spark, new
Path(path)).update().allFiles.collect().head
+ val partitionedFile = partitionedFileWithMetadata(
+ path,
+ dataFile.path,
+ dataFile.size,
+ Map("kept_key" -> "kept_value"))
+
+ val scanInfo = DeltaDeletionVectorScanInfo.extract(spark, 0,
partitionedFile)
+ val dvInfo = scanInfo.deletionVectorInfo
+
+ assert(!dvInfo.hasDeletionVector)
+ assert(dvInfo.rowIndexFilterType == RowIndexFilterType.KEEP_ALL)
+ assert(dvInfo.cardinality == 0L)
+ assert(dvInfo.serializedDeletionVector.isEmpty)
+ assert(scanInfo.normalizedOtherMetadataColumns == Map("kept_key" ->
"kept_value"))
+ }
+ }
+
+ test("rejects partial Delta DV split metadata") {
+ withTempDir {
+ tempDir =>
+ val path = tempDir.getCanonicalPath
+ Seq((1, "a")).toDF("id",
"value").coalesce(1).write.format("delta").save(path)
+
+ val dataFile = DeltaLog.forTable(spark, new
Path(path)).update().allFiles.collect().head
+ val partitionedFile = partitionedFileWithMetadata(
+ path,
+ dataFile.path,
+ dataFile.size,
+ Map(GlutenDeltaParquetFileFormat.FILE_ROW_INDEX_FILTER_TYPE ->
"IF_CONTAINED"))
+
+ val error = intercept[IllegalStateException] {
+ DeltaDeletionVectorScanInfo.extract(spark, 0, partitionedFile)
+ }
+ assert(error.getMessage.contains("must either be present or absent"))
+ }
+ }
+
+ private def partitionedFileWithMetadata(
+ tablePath: String,
+ relativeFilePath: String,
+ fileSize: Long,
+ metadata: Map[String, Object]): PartitionedFile = {
+ PartitionedFile(
+ partitionValues = InternalRow.empty,
+ filePath = SparkPath.fromPath(new Path(tablePath, relativeFilePath)),
+ start = 0L,
+ length = fileSize,
+ fileSize = fileSize,
+ otherConstantMetadataColumnValues = metadata
+ )
+ }
+}
diff --git
a/backends-velox/src-delta40/main/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfo.scala
b/backends-velox/src-delta40/main/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfo.scala
new file mode 100644
index 0000000000..69a5dc3078
--- /dev/null
+++
b/backends-velox/src-delta40/main/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfo.scala
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.delta
+
+import org.apache.gluten.sql.shims.SparkShimLoader
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.delta.GlutenDeltaParquetFileFormat
+import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor
+import org.apache.spark.sql.delta.deletionvectors.{RoaringBitmapArrayFormat,
StoredBitmap}
+import org.apache.spark.sql.delta.storage.dv.HadoopFileSystemDVStore
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+
+import org.apache.hadoop.fs.Path
+
+import java.util.{ArrayList => JArrayList}
+
+import scala.collection.JavaConverters._
+import scala.util.Try
+import scala.util.control.NonFatal
+
+object DeltaDeletionVectorScanInfo {
+ object RowIndexFilterType extends Enumeration {
+ type RowIndexFilterType = Value
+ val KEEP_ALL, IF_CONTAINED, IF_NOT_CONTAINED = Value
+ }
+
+ import RowIndexFilterType._
+
+ final case class DeletionVectorInfo(
+ hasDeletionVector: Boolean,
+ rowIndexFilterType: RowIndexFilterType,
+ cardinality: Long,
+ serializedDeletionVector: Array[Byte])
+
+ final case class PartitionFileScanInfo(
+ normalizedOtherMetadataColumns: Map[String, Object],
+ deletionVectorInfo: DeletionVectorInfo)
+
+ private val RowIndexFilterIdEncoded =
+ GlutenDeltaParquetFileFormat.FILE_ROW_INDEX_FILTER_ID_ENCODED
+ private val RowIndexFilterTypeKey =
+ GlutenDeltaParquetFileFormat.FILE_ROW_INDEX_FILTER_TYPE
+
+ def extract(
+ spark: SparkSession,
+ partitionColumnCount: Int,
+ file: PartitionedFile): PartitionFileScanInfo = {
+ val metadata = otherMetadataColumns(file)
+ val normalizedMetadata = metadata -- Seq(RowIndexFilterIdEncoded,
RowIndexFilterTypeKey)
+ val dvInfo = extractDeletionVectorInfo(spark, partitionColumnCount, file,
metadata)
+ PartitionFileScanInfo(normalizedMetadata, dvInfo)
+ }
+
+ def extractAll(
+ spark: SparkSession,
+ partitionColumnCount: Int,
+ files: Seq[PartitionedFile]): Seq[PartitionFileScanInfo] = {
+ files.map(extract(spark, partitionColumnCount, _))
+ }
+
+ def extractAllFromJava(
+ spark: SparkSession,
+ partitionColumnCount: Int,
+ files: java.util.List[PartitionedFile]):
java.util.List[PartitionFileScanInfo] = {
+ new JArrayList(extractAll(spark, partitionColumnCount,
files.asScala.toSeq).asJava)
+ }
+
+ private def extractDeletionVectorInfo(
+ spark: SparkSession,
+ partitionColumnCount: Int,
+ file: PartitionedFile,
+ metadata: Map[String, Object]): DeletionVectorInfo = {
+ val descriptorValue = metadata.get(RowIndexFilterIdEncoded)
+ val filterTypeValue = metadata.get(RowIndexFilterTypeKey)
+
+ (descriptorValue, filterTypeValue) match {
+ case (None, None) =>
+ DeletionVectorInfo(false, KEEP_ALL, 0L, Array.emptyByteArray)
+ case (Some(encodedDescriptor), Some(filterType)) =>
+ val descriptor = parseDescriptor(encodedDescriptor.toString)
+ val serializedPayload = serializePayload(spark, partitionColumnCount,
file, descriptor)
+ DeletionVectorInfo(
+ true,
+ parseRowIndexFilterType(filterType.toString),
+ descriptor.cardinality,
+ serializedPayload)
+ case _ =>
+ throw new IllegalStateException(
+ s"Both $RowIndexFilterIdEncoded and $RowIndexFilterTypeKey must
either be present or absent")
+ }
+ }
+
+ private def otherMetadataColumns(file: PartitionedFile): Map[String, Object]
= {
+ val otherMetadata =
+ SparkShimLoader.getSparkShims.getOtherConstantMetadataColumnValues(file)
+ if (otherMetadata == null) {
+ Map.empty
+ } else {
+ otherMetadata.asScala.toMap
+ }
+ }
+
+ private def parseDescriptor(encodedDescriptor: String):
DeletionVectorDescriptor = {
+ val methods = Seq("deserializeFromBase64", "fromJson")
+ methods.iterator
+ .map {
+ methodName =>
+ Try {
+ val method =
DeletionVectorDescriptor.getClass.getMethod(methodName, classOf[String])
+ method
+ .invoke(DeletionVectorDescriptor, encodedDescriptor)
+ .asInstanceOf[DeletionVectorDescriptor]
+ }.toOption
+ }
+ .collectFirst { case Some(descriptor) => descriptor }
+ .getOrElse {
+ throw new IllegalArgumentException("Unable to parse Delta deletion
vector descriptor")
+ }
+ }
+
+ private def parseRowIndexFilterType(filterType: String): RowIndexFilterType
= {
+ filterType match {
+ case "IF_CONTAINED" => IF_CONTAINED
+ case "IF_NOT_CONTAINED" => IF_NOT_CONTAINED
+ case "KEEP_ALL" => KEEP_ALL
+ case unexpected =>
+ throw new IllegalStateException(s"Unexpected row index filter type:
$unexpected")
+ }
+ }
+
+ private def serializePayload(
+ spark: SparkSession,
+ partitionColumnCount: Int,
+ file: PartitionedFile,
+ descriptor: DeletionVectorDescriptor): Array[Byte] = {
+ val tablePath = resolveTablePath(spark, partitionColumnCount, file)
+ if (tablePath == null) {
+ throw new IllegalStateException(
+ "Unable to resolve Delta table path while materializing deletion
vector payload")
+ }
+ val dvStore = new
HadoopFileSystemDVStore(spark.sessionState.newHadoopConf())
+ StoredBitmap
+ .create(descriptor, tablePath)
+ .load(dvStore)
+ .serializeAsByteArray(RoaringBitmapArrayFormat.Portable)
+ }
+
+ private def resolveTablePath(
+ spark: SparkSession,
+ partitionColumnCount: Int,
+ file: PartitionedFile): Path = {
+ val fileParent = new
Path(unescapePathName(file.filePath.toString)).getParent
+ var tablePath = fileParent
+ for (_ <- 0 until partitionColumnCount) {
+ tablePath = tablePath.getParent
+ }
+ if (tablePath != null && isDeltaTablePath(spark, tablePath)) {
+ return tablePath
+ }
+
+ var candidate = fileParent
+ while (candidate != null && !isDeltaTablePath(spark, candidate)) {
+ candidate = candidate.getParent
+ }
+ if (candidate != null) candidate else tablePath
+ }
+
+ private def isDeltaTablePath(spark: SparkSession, tablePath: Path): Boolean
= {
+ val deltaLogPath = new Path(tablePath, "_delta_log")
+ try {
+
deltaLogPath.getFileSystem(spark.sessionState.newHadoopConf()).exists(deltaLogPath)
+ } catch {
+ case NonFatal(_) => false
+ }
+ }
+
+ private def unescapePathName(path: String): String = {
+ if (path == null || path.indexOf('%') < 0) {
+ path
+ } else {
+ val builder = new StringBuilder(path.length)
+ var index = 0
+ while (index < path.length) {
+ if (path.charAt(index) == '%' && index + 2 < path.length) {
+ val high = Character.digit(path.charAt(index + 1), 16)
+ val low = Character.digit(path.charAt(index + 2), 16)
+ if (high >= 0 && low >= 0) {
+ builder.append(((high << 4) | low).toChar)
+ index += 3
+ } else {
+ builder.append(path.charAt(index))
+ index += 1
+ }
+ } else {
+ builder.append(path.charAt(index))
+ index += 1
+ }
+ }
+ builder.toString()
+ }
+ }
+}
diff --git
a/backends-velox/src-delta40/test/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfoSuite.scala
b/backends-velox/src-delta40/test/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfoSuite.scala
new file mode 100644
index 0000000000..cbb70817ba
--- /dev/null
+++
b/backends-velox/src-delta40/test/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfoSuite.scala
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.delta
+
+import org.apache.gluten.delta.DeltaDeletionVectorScanInfo.RowIndexFilterType
+
+import org.apache.spark.SparkConf
+import org.apache.spark.paths.SparkPath
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.delta.{DeltaLog, GlutenDeltaParquetFileFormat}
+import org.apache.spark.sql.delta.catalog.DeltaCatalog
+import org.apache.spark.sql.delta.test.DeltaSQLTestUtils
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.tags.ExtendedSQLTest
+
+import io.delta.sql.DeltaSparkSessionExtension
+import org.apache.hadoop.fs.Path
+
+@ExtendedSQLTest
+class DeltaDeletionVectorScanInfoSuite
+ extends QueryTest
+ with SharedSparkSession
+ with DeltaSQLTestUtils {
+
+ import testImplicits._
+
+ override protected def sparkConf: SparkConf = {
+ super.sparkConf
+ .set(StaticSQLConf.SPARK_SESSION_EXTENSIONS.key,
classOf[DeltaSparkSessionExtension].getName)
+ .set(SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION.key,
classOf[DeltaCatalog].getName)
+ .set("spark.databricks.delta.snapshotPartitions", "2")
+ }
+
+ test("extracts essential Delta DV scan info from split metadata") {
+ withTempDir {
+ tempDir =>
+ val path = tempDir.getCanonicalPath
+ Seq((1, "a"), (2, "b"), (3, "c"), (4, "d"))
+ .toDF("id", "value")
+ .coalesce(1)
+ .write
+ .format("delta")
+ .save(path)
+
+ spark.sql(
+ s"ALTER TABLE delta.`$path` SET TBLPROPERTIES
('delta.enableDeletionVectors' = true)")
+ spark.sql(s"DELETE FROM delta.`$path` WHERE id IN (3, 4)")
+
+ val dataFile = DeltaLog
+ .forTable(spark, new Path(path))
+ .update()
+ .allFiles
+ .collect()
+ .find(_.deletionVector != null)
+ .get
+ val partitionedFile = partitionedFileWithMetadata(
+ path,
+ dataFile.path,
+ dataFile.size,
+ Map(
+ GlutenDeltaParquetFileFormat.FILE_ROW_INDEX_FILTER_ID_ENCODED ->
+ dataFile.deletionVector.serializeToBase64(),
+ GlutenDeltaParquetFileFormat.FILE_ROW_INDEX_FILTER_TYPE ->
"IF_CONTAINED",
+ "kept_key" -> "kept_value"
+ )
+ )
+
+ val scanInfo = DeltaDeletionVectorScanInfo.extract(spark, 0,
partitionedFile)
+ val dvInfo = scanInfo.deletionVectorInfo
+
+ assert(dvInfo.hasDeletionVector)
+ assert(dvInfo.rowIndexFilterType == RowIndexFilterType.IF_CONTAINED)
+ assert(dvInfo.cardinality == dataFile.deletionVector.cardinality)
+ assert(dvInfo.serializedDeletionVector.nonEmpty)
+ assert(scanInfo.normalizedOtherMetadataColumns == Map("kept_key" ->
"kept_value"))
+ }
+ }
+
+ test("returns keep-all scan info when Delta DV metadata is absent") {
+ withTempDir {
+ tempDir =>
+ val path = tempDir.getCanonicalPath
+ Seq((1, "a")).toDF("id",
"value").coalesce(1).write.format("delta").save(path)
+
+ val dataFile = DeltaLog.forTable(spark, new
Path(path)).update().allFiles.collect().head
+ val partitionedFile = partitionedFileWithMetadata(
+ path,
+ dataFile.path,
+ dataFile.size,
+ Map("kept_key" -> "kept_value"))
+
+ val scanInfo = DeltaDeletionVectorScanInfo.extract(spark, 0,
partitionedFile)
+ val dvInfo = scanInfo.deletionVectorInfo
+
+ assert(!dvInfo.hasDeletionVector)
+ assert(dvInfo.rowIndexFilterType == RowIndexFilterType.KEEP_ALL)
+ assert(dvInfo.cardinality == 0L)
+ assert(dvInfo.serializedDeletionVector.isEmpty)
+ assert(scanInfo.normalizedOtherMetadataColumns == Map("kept_key" ->
"kept_value"))
+ }
+ }
+
+ test("rejects partial Delta DV split metadata") {
+ withTempDir {
+ tempDir =>
+ val path = tempDir.getCanonicalPath
+ Seq((1, "a")).toDF("id",
"value").coalesce(1).write.format("delta").save(path)
+
+ val dataFile = DeltaLog.forTable(spark, new
Path(path)).update().allFiles.collect().head
+ val partitionedFile = partitionedFileWithMetadata(
+ path,
+ dataFile.path,
+ dataFile.size,
+ Map(GlutenDeltaParquetFileFormat.FILE_ROW_INDEX_FILTER_TYPE ->
"IF_CONTAINED"))
+
+ val error = intercept[IllegalStateException] {
+ DeltaDeletionVectorScanInfo.extract(spark, 0, partitionedFile)
+ }
+ assert(error.getMessage.contains("must either be present or absent"))
+ }
+ }
+
+ private def partitionedFileWithMetadata(
+ tablePath: String,
+ relativeFilePath: String,
+ fileSize: Long,
+ metadata: Map[String, Object]): PartitionedFile = {
+ PartitionedFile(
+ partitionValues = InternalRow.empty,
+ filePath = SparkPath.fromPath(new Path(tablePath, relativeFilePath)),
+ start = 0L,
+ length = fileSize,
+ fileSize = fileSize,
+ otherConstantMetadataColumnValues = metadata
+ )
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]