This is an automated email from the ASF dual-hosted git repository.

zhztheplayer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 97f06b41d0 [VL][Delta] Add DV scan info extraction utility (#12197)
97f06b41d0 is described below

commit 97f06b41d07104455388ac764a075166e74f33e3
Author: Mohammad Linjawi <[email protected]>
AuthorDate: Wed Jun 3 16:12:03 2026 +0300

    [VL][Delta] Add DV scan info extraction utility (#12197)
    
    * [VL][Delta] Add DV scan info extraction utility
    
    * [VL][Delta] Hide DV descriptor from scan info API
    
    * [VL][Delta] Reorder DV scan info fields
    
    ---------
    
    Co-authored-by: Mohammad Linjawi <[email protected]>
---
 .../gluten/delta/DeltaDeletionVectorScanInfo.scala | 207 ++++++++++++++++++++
 .../delta/DeltaDeletionVectorScanInfoSuite.scala   | 154 +++++++++++++++
 .../gluten/delta/DeltaDeletionVectorScanInfo.scala | 217 +++++++++++++++++++++
 .../delta/DeltaDeletionVectorScanInfoSuite.scala   | 154 +++++++++++++++
 4 files changed, 732 insertions(+)

diff --git 
a/backends-velox/src-delta33/main/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfo.scala
 
b/backends-velox/src-delta33/main/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfo.scala
new file mode 100644
index 0000000000..8ada3d755c
--- /dev/null
+++ 
b/backends-velox/src-delta33/main/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfo.scala
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.delta
+
+import org.apache.gluten.sql.shims.SparkShimLoader
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.delta.GlutenDeltaParquetFileFormat
+import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor
+import org.apache.spark.sql.delta.deletionvectors.{RoaringBitmapArrayFormat, 
StoredBitmap}
+import org.apache.spark.sql.delta.storage.dv.HadoopFileSystemDVStore
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+
+import org.apache.hadoop.fs.Path
+
+import java.util.{ArrayList => JArrayList}
+
+import scala.collection.JavaConverters._
+import scala.util.control.NonFatal
+
+object DeltaDeletionVectorScanInfo {
+  object RowIndexFilterType extends Enumeration {
+    type RowIndexFilterType = Value
+    val KEEP_ALL, IF_CONTAINED, IF_NOT_CONTAINED = Value
+  }
+
+  import RowIndexFilterType._
+
+  final case class DeletionVectorInfo(
+      hasDeletionVector: Boolean,
+      rowIndexFilterType: RowIndexFilterType,
+      cardinality: Long,
+      serializedDeletionVector: Array[Byte])
+
+  final case class PartitionFileScanInfo(
+      normalizedOtherMetadataColumns: Map[String, Object],
+      deletionVectorInfo: DeletionVectorInfo)
+
+  private val RowIndexFilterIdEncoded =
+    GlutenDeltaParquetFileFormat.FILE_ROW_INDEX_FILTER_ID_ENCODED
+  private val RowIndexFilterTypeKey =
+    GlutenDeltaParquetFileFormat.FILE_ROW_INDEX_FILTER_TYPE
+
+  def extract(
+      spark: SparkSession,
+      partitionColumnCount: Int,
+      file: PartitionedFile): PartitionFileScanInfo = {
+    val metadata = otherMetadataColumns(file)
+    val normalizedMetadata = metadata -- Seq(RowIndexFilterIdEncoded, 
RowIndexFilterTypeKey)
+    val dvInfo = extractDeletionVectorInfo(spark, partitionColumnCount, file, 
metadata)
+    PartitionFileScanInfo(normalizedMetadata, dvInfo)
+  }
+
+  def extractAll(
+      spark: SparkSession,
+      partitionColumnCount: Int,
+      files: Seq[PartitionedFile]): Seq[PartitionFileScanInfo] = {
+    files.map(extract(spark, partitionColumnCount, _))
+  }
+
+  def extractAllFromJava(
+      spark: SparkSession,
+      partitionColumnCount: Int,
+      files: java.util.List[PartitionedFile]): 
java.util.List[PartitionFileScanInfo] = {
+    new JArrayList(extractAll(spark, partitionColumnCount, 
files.asScala.toSeq).asJava)
+  }
+
+  private def extractDeletionVectorInfo(
+      spark: SparkSession,
+      partitionColumnCount: Int,
+      file: PartitionedFile,
+      metadata: Map[String, Object]): DeletionVectorInfo = {
+    val descriptorValue = metadata.get(RowIndexFilterIdEncoded)
+    val filterTypeValue = metadata.get(RowIndexFilterTypeKey)
+
+    (descriptorValue, filterTypeValue) match {
+      case (None, None) =>
+        DeletionVectorInfo(false, KEEP_ALL, 0L, Array.emptyByteArray)
+      case (Some(encodedDescriptor), Some(filterType)) =>
+        val descriptor = parseDescriptor(encodedDescriptor.toString)
+        val serializedPayload = serializePayload(spark, partitionColumnCount, 
file, descriptor)
+        DeletionVectorInfo(
+          true,
+          parseRowIndexFilterType(filterType.toString),
+          descriptor.cardinality,
+          serializedPayload)
+      case _ =>
+        throw new IllegalStateException(
+          s"Both $RowIndexFilterIdEncoded and $RowIndexFilterTypeKey must 
either be present or absent")
+    }
+  }
+
+  private def otherMetadataColumns(file: PartitionedFile): Map[String, Object] 
= {
+    val otherMetadata =
+      SparkShimLoader.getSparkShims.getOtherConstantMetadataColumnValues(file)
+    if (otherMetadata == null) {
+      Map.empty
+    } else {
+      otherMetadata.asScala.toMap
+    }
+  }
+
+  private def parseDescriptor(encodedDescriptor: String): 
DeletionVectorDescriptor = {
+    try {
+      DeletionVectorDescriptor.deserializeFromBase64(encodedDescriptor)
+    } catch {
+      case NonFatal(e) =>
+        throw new IllegalArgumentException("Unable to parse Delta deletion 
vector descriptor", e)
+    }
+  }
+
+  private def parseRowIndexFilterType(filterType: String): RowIndexFilterType 
= {
+    filterType match {
+      case "IF_CONTAINED" => IF_CONTAINED
+      case "IF_NOT_CONTAINED" => IF_NOT_CONTAINED
+      case "KEEP_ALL" => KEEP_ALL
+      case unexpected =>
+        throw new IllegalStateException(s"Unexpected row index filter type: 
$unexpected")
+    }
+  }
+
+  private def serializePayload(
+      spark: SparkSession,
+      partitionColumnCount: Int,
+      file: PartitionedFile,
+      descriptor: DeletionVectorDescriptor): Array[Byte] = {
+    val tablePath = resolveTablePath(spark, partitionColumnCount, file)
+    if (tablePath == null) {
+      throw new IllegalStateException(
+        "Unable to resolve Delta table path while materializing deletion 
vector payload")
+    }
+    val dvStore = new 
HadoopFileSystemDVStore(spark.sessionState.newHadoopConf())
+    StoredBitmap
+      .create(descriptor, tablePath)
+      .load(dvStore)
+      .serializeAsByteArray(RoaringBitmapArrayFormat.Portable)
+  }
+
+  private def resolveTablePath(
+      spark: SparkSession,
+      partitionColumnCount: Int,
+      file: PartitionedFile): Path = {
+    val fileParent = new 
Path(unescapePathName(file.filePath.toString)).getParent
+    var tablePath = fileParent
+    for (_ <- 0 until partitionColumnCount) {
+      tablePath = tablePath.getParent
+    }
+    if (tablePath != null && isDeltaTablePath(spark, tablePath)) {
+      return tablePath
+    }
+
+    var candidate = fileParent
+    while (candidate != null && !isDeltaTablePath(spark, candidate)) {
+      candidate = candidate.getParent
+    }
+    if (candidate != null) candidate else tablePath
+  }
+
+  private def isDeltaTablePath(spark: SparkSession, tablePath: Path): Boolean 
= {
+    val deltaLogPath = new Path(tablePath, "_delta_log")
+    try {
+      
deltaLogPath.getFileSystem(spark.sessionState.newHadoopConf()).exists(deltaLogPath)
+    } catch {
+      case NonFatal(_) => false
+    }
+  }
+
+  private def unescapePathName(path: String): String = {
+    if (path == null || path.indexOf('%') < 0) {
+      path
+    } else {
+      val builder = new StringBuilder(path.length)
+      var index = 0
+      while (index < path.length) {
+        if (path.charAt(index) == '%' && index + 2 < path.length) {
+          val high = Character.digit(path.charAt(index + 1), 16)
+          val low = Character.digit(path.charAt(index + 2), 16)
+          if (high >= 0 && low >= 0) {
+            builder.append(((high << 4) | low).toChar)
+            index += 3
+          } else {
+            builder.append(path.charAt(index))
+            index += 1
+          }
+        } else {
+          builder.append(path.charAt(index))
+          index += 1
+        }
+      }
+      builder.toString()
+    }
+  }
+}
diff --git 
a/backends-velox/src-delta33/test/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfoSuite.scala
 
b/backends-velox/src-delta33/test/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfoSuite.scala
new file mode 100644
index 0000000000..cbb70817ba
--- /dev/null
+++ 
b/backends-velox/src-delta33/test/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfoSuite.scala
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.delta
+
+import org.apache.gluten.delta.DeltaDeletionVectorScanInfo.RowIndexFilterType
+
+import org.apache.spark.SparkConf
+import org.apache.spark.paths.SparkPath
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.delta.{DeltaLog, GlutenDeltaParquetFileFormat}
+import org.apache.spark.sql.delta.catalog.DeltaCatalog
+import org.apache.spark.sql.delta.test.DeltaSQLTestUtils
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.tags.ExtendedSQLTest
+
+import io.delta.sql.DeltaSparkSessionExtension
+import org.apache.hadoop.fs.Path
+
+@ExtendedSQLTest
+class DeltaDeletionVectorScanInfoSuite
+  extends QueryTest
+  with SharedSparkSession
+  with DeltaSQLTestUtils {
+
+  import testImplicits._
+
+  override protected def sparkConf: SparkConf = {
+    super.sparkConf
+      .set(StaticSQLConf.SPARK_SESSION_EXTENSIONS.key, 
classOf[DeltaSparkSessionExtension].getName)
+      .set(SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION.key, 
classOf[DeltaCatalog].getName)
+      .set("spark.databricks.delta.snapshotPartitions", "2")
+  }
+
+  test("extracts essential Delta DV scan info from split metadata") {
+    withTempDir {
+      tempDir =>
+        val path = tempDir.getCanonicalPath
+        Seq((1, "a"), (2, "b"), (3, "c"), (4, "d"))
+          .toDF("id", "value")
+          .coalesce(1)
+          .write
+          .format("delta")
+          .save(path)
+
+        spark.sql(
+          s"ALTER TABLE delta.`$path` SET TBLPROPERTIES 
('delta.enableDeletionVectors' = true)")
+        spark.sql(s"DELETE FROM delta.`$path` WHERE id IN (3, 4)")
+
+        val dataFile = DeltaLog
+          .forTable(spark, new Path(path))
+          .update()
+          .allFiles
+          .collect()
+          .find(_.deletionVector != null)
+          .get
+        val partitionedFile = partitionedFileWithMetadata(
+          path,
+          dataFile.path,
+          dataFile.size,
+          Map(
+            GlutenDeltaParquetFileFormat.FILE_ROW_INDEX_FILTER_ID_ENCODED ->
+              dataFile.deletionVector.serializeToBase64(),
+            GlutenDeltaParquetFileFormat.FILE_ROW_INDEX_FILTER_TYPE -> 
"IF_CONTAINED",
+            "kept_key" -> "kept_value"
+          )
+        )
+
+        val scanInfo = DeltaDeletionVectorScanInfo.extract(spark, 0, 
partitionedFile)
+        val dvInfo = scanInfo.deletionVectorInfo
+
+        assert(dvInfo.hasDeletionVector)
+        assert(dvInfo.rowIndexFilterType == RowIndexFilterType.IF_CONTAINED)
+        assert(dvInfo.cardinality == dataFile.deletionVector.cardinality)
+        assert(dvInfo.serializedDeletionVector.nonEmpty)
+        assert(scanInfo.normalizedOtherMetadataColumns == Map("kept_key" -> 
"kept_value"))
+    }
+  }
+
+  test("returns keep-all scan info when Delta DV metadata is absent") {
+    withTempDir {
+      tempDir =>
+        val path = tempDir.getCanonicalPath
+        Seq((1, "a")).toDF("id", 
"value").coalesce(1).write.format("delta").save(path)
+
+        val dataFile = DeltaLog.forTable(spark, new 
Path(path)).update().allFiles.collect().head
+        val partitionedFile = partitionedFileWithMetadata(
+          path,
+          dataFile.path,
+          dataFile.size,
+          Map("kept_key" -> "kept_value"))
+
+        val scanInfo = DeltaDeletionVectorScanInfo.extract(spark, 0, 
partitionedFile)
+        val dvInfo = scanInfo.deletionVectorInfo
+
+        assert(!dvInfo.hasDeletionVector)
+        assert(dvInfo.rowIndexFilterType == RowIndexFilterType.KEEP_ALL)
+        assert(dvInfo.cardinality == 0L)
+        assert(dvInfo.serializedDeletionVector.isEmpty)
+        assert(scanInfo.normalizedOtherMetadataColumns == Map("kept_key" -> 
"kept_value"))
+    }
+  }
+
+  test("rejects partial Delta DV split metadata") {
+    withTempDir {
+      tempDir =>
+        val path = tempDir.getCanonicalPath
+        Seq((1, "a")).toDF("id", 
"value").coalesce(1).write.format("delta").save(path)
+
+        val dataFile = DeltaLog.forTable(spark, new 
Path(path)).update().allFiles.collect().head
+        val partitionedFile = partitionedFileWithMetadata(
+          path,
+          dataFile.path,
+          dataFile.size,
+          Map(GlutenDeltaParquetFileFormat.FILE_ROW_INDEX_FILTER_TYPE -> 
"IF_CONTAINED"))
+
+        val error = intercept[IllegalStateException] {
+          DeltaDeletionVectorScanInfo.extract(spark, 0, partitionedFile)
+        }
+        assert(error.getMessage.contains("must either be present or absent"))
+    }
+  }
+
+  private def partitionedFileWithMetadata(
+      tablePath: String,
+      relativeFilePath: String,
+      fileSize: Long,
+      metadata: Map[String, Object]): PartitionedFile = {
+    PartitionedFile(
+      partitionValues = InternalRow.empty,
+      filePath = SparkPath.fromPath(new Path(tablePath, relativeFilePath)),
+      start = 0L,
+      length = fileSize,
+      fileSize = fileSize,
+      otherConstantMetadataColumnValues = metadata
+    )
+  }
+}
diff --git 
a/backends-velox/src-delta40/main/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfo.scala
 
b/backends-velox/src-delta40/main/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfo.scala
new file mode 100644
index 0000000000..69a5dc3078
--- /dev/null
+++ 
b/backends-velox/src-delta40/main/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfo.scala
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.delta
+
+import org.apache.gluten.sql.shims.SparkShimLoader
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.delta.GlutenDeltaParquetFileFormat
+import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor
+import org.apache.spark.sql.delta.deletionvectors.{RoaringBitmapArrayFormat, 
StoredBitmap}
+import org.apache.spark.sql.delta.storage.dv.HadoopFileSystemDVStore
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+
+import org.apache.hadoop.fs.Path
+
+import java.util.{ArrayList => JArrayList}
+
+import scala.collection.JavaConverters._
+import scala.util.Try
+import scala.util.control.NonFatal
+
+object DeltaDeletionVectorScanInfo {
+  object RowIndexFilterType extends Enumeration {
+    type RowIndexFilterType = Value
+    val KEEP_ALL, IF_CONTAINED, IF_NOT_CONTAINED = Value
+  }
+
+  import RowIndexFilterType._
+
+  final case class DeletionVectorInfo(
+      hasDeletionVector: Boolean,
+      rowIndexFilterType: RowIndexFilterType,
+      cardinality: Long,
+      serializedDeletionVector: Array[Byte])
+
+  final case class PartitionFileScanInfo(
+      normalizedOtherMetadataColumns: Map[String, Object],
+      deletionVectorInfo: DeletionVectorInfo)
+
+  private val RowIndexFilterIdEncoded =
+    GlutenDeltaParquetFileFormat.FILE_ROW_INDEX_FILTER_ID_ENCODED
+  private val RowIndexFilterTypeKey =
+    GlutenDeltaParquetFileFormat.FILE_ROW_INDEX_FILTER_TYPE
+
+  def extract(
+      spark: SparkSession,
+      partitionColumnCount: Int,
+      file: PartitionedFile): PartitionFileScanInfo = {
+    val metadata = otherMetadataColumns(file)
+    val normalizedMetadata = metadata -- Seq(RowIndexFilterIdEncoded, 
RowIndexFilterTypeKey)
+    val dvInfo = extractDeletionVectorInfo(spark, partitionColumnCount, file, 
metadata)
+    PartitionFileScanInfo(normalizedMetadata, dvInfo)
+  }
+
+  def extractAll(
+      spark: SparkSession,
+      partitionColumnCount: Int,
+      files: Seq[PartitionedFile]): Seq[PartitionFileScanInfo] = {
+    files.map(extract(spark, partitionColumnCount, _))
+  }
+
+  def extractAllFromJava(
+      spark: SparkSession,
+      partitionColumnCount: Int,
+      files: java.util.List[PartitionedFile]): 
java.util.List[PartitionFileScanInfo] = {
+    new JArrayList(extractAll(spark, partitionColumnCount, 
files.asScala.toSeq).asJava)
+  }
+
+  private def extractDeletionVectorInfo(
+      spark: SparkSession,
+      partitionColumnCount: Int,
+      file: PartitionedFile,
+      metadata: Map[String, Object]): DeletionVectorInfo = {
+    val descriptorValue = metadata.get(RowIndexFilterIdEncoded)
+    val filterTypeValue = metadata.get(RowIndexFilterTypeKey)
+
+    (descriptorValue, filterTypeValue) match {
+      case (None, None) =>
+        DeletionVectorInfo(false, KEEP_ALL, 0L, Array.emptyByteArray)
+      case (Some(encodedDescriptor), Some(filterType)) =>
+        val descriptor = parseDescriptor(encodedDescriptor.toString)
+        val serializedPayload = serializePayload(spark, partitionColumnCount, 
file, descriptor)
+        DeletionVectorInfo(
+          true,
+          parseRowIndexFilterType(filterType.toString),
+          descriptor.cardinality,
+          serializedPayload)
+      case _ =>
+        throw new IllegalStateException(
+          s"Both $RowIndexFilterIdEncoded and $RowIndexFilterTypeKey must 
either be present or absent")
+    }
+  }
+
+  private def otherMetadataColumns(file: PartitionedFile): Map[String, Object] 
= {
+    val otherMetadata =
+      SparkShimLoader.getSparkShims.getOtherConstantMetadataColumnValues(file)
+    if (otherMetadata == null) {
+      Map.empty
+    } else {
+      otherMetadata.asScala.toMap
+    }
+  }
+
+  private def parseDescriptor(encodedDescriptor: String): 
DeletionVectorDescriptor = {
+    val methods = Seq("deserializeFromBase64", "fromJson")
+    methods.iterator
+      .map {
+        methodName =>
+          Try {
+            val method = 
DeletionVectorDescriptor.getClass.getMethod(methodName, classOf[String])
+            method
+              .invoke(DeletionVectorDescriptor, encodedDescriptor)
+              .asInstanceOf[DeletionVectorDescriptor]
+          }.toOption
+      }
+      .collectFirst { case Some(descriptor) => descriptor }
+      .getOrElse {
+        throw new IllegalArgumentException("Unable to parse Delta deletion 
vector descriptor")
+      }
+  }
+
+  private def parseRowIndexFilterType(filterType: String): RowIndexFilterType 
= {
+    filterType match {
+      case "IF_CONTAINED" => IF_CONTAINED
+      case "IF_NOT_CONTAINED" => IF_NOT_CONTAINED
+      case "KEEP_ALL" => KEEP_ALL
+      case unexpected =>
+        throw new IllegalStateException(s"Unexpected row index filter type: 
$unexpected")
+    }
+  }
+
+  private def serializePayload(
+      spark: SparkSession,
+      partitionColumnCount: Int,
+      file: PartitionedFile,
+      descriptor: DeletionVectorDescriptor): Array[Byte] = {
+    val tablePath = resolveTablePath(spark, partitionColumnCount, file)
+    if (tablePath == null) {
+      throw new IllegalStateException(
+        "Unable to resolve Delta table path while materializing deletion 
vector payload")
+    }
+    val dvStore = new 
HadoopFileSystemDVStore(spark.sessionState.newHadoopConf())
+    StoredBitmap
+      .create(descriptor, tablePath)
+      .load(dvStore)
+      .serializeAsByteArray(RoaringBitmapArrayFormat.Portable)
+  }
+
+  private def resolveTablePath(
+      spark: SparkSession,
+      partitionColumnCount: Int,
+      file: PartitionedFile): Path = {
+    val fileParent = new 
Path(unescapePathName(file.filePath.toString)).getParent
+    var tablePath = fileParent
+    for (_ <- 0 until partitionColumnCount) {
+      tablePath = tablePath.getParent
+    }
+    if (tablePath != null && isDeltaTablePath(spark, tablePath)) {
+      return tablePath
+    }
+
+    var candidate = fileParent
+    while (candidate != null && !isDeltaTablePath(spark, candidate)) {
+      candidate = candidate.getParent
+    }
+    if (candidate != null) candidate else tablePath
+  }
+
+  private def isDeltaTablePath(spark: SparkSession, tablePath: Path): Boolean 
= {
+    val deltaLogPath = new Path(tablePath, "_delta_log")
+    try {
+      
deltaLogPath.getFileSystem(spark.sessionState.newHadoopConf()).exists(deltaLogPath)
+    } catch {
+      case NonFatal(_) => false
+    }
+  }
+
+  private def unescapePathName(path: String): String = {
+    if (path == null || path.indexOf('%') < 0) {
+      path
+    } else {
+      val builder = new StringBuilder(path.length)
+      var index = 0
+      while (index < path.length) {
+        if (path.charAt(index) == '%' && index + 2 < path.length) {
+          val high = Character.digit(path.charAt(index + 1), 16)
+          val low = Character.digit(path.charAt(index + 2), 16)
+          if (high >= 0 && low >= 0) {
+            builder.append(((high << 4) | low).toChar)
+            index += 3
+          } else {
+            builder.append(path.charAt(index))
+            index += 1
+          }
+        } else {
+          builder.append(path.charAt(index))
+          index += 1
+        }
+      }
+      builder.toString()
+    }
+  }
+}
diff --git 
a/backends-velox/src-delta40/test/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfoSuite.scala
 
b/backends-velox/src-delta40/test/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfoSuite.scala
new file mode 100644
index 0000000000..cbb70817ba
--- /dev/null
+++ 
b/backends-velox/src-delta40/test/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfoSuite.scala
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.delta
+
+import org.apache.gluten.delta.DeltaDeletionVectorScanInfo.RowIndexFilterType
+
+import org.apache.spark.SparkConf
+import org.apache.spark.paths.SparkPath
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.delta.{DeltaLog, GlutenDeltaParquetFileFormat}
+import org.apache.spark.sql.delta.catalog.DeltaCatalog
+import org.apache.spark.sql.delta.test.DeltaSQLTestUtils
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.tags.ExtendedSQLTest
+
+import io.delta.sql.DeltaSparkSessionExtension
+import org.apache.hadoop.fs.Path
+
+@ExtendedSQLTest
+class DeltaDeletionVectorScanInfoSuite
+  extends QueryTest
+  with SharedSparkSession
+  with DeltaSQLTestUtils {
+
+  import testImplicits._
+
+  override protected def sparkConf: SparkConf = {
+    super.sparkConf
+      .set(StaticSQLConf.SPARK_SESSION_EXTENSIONS.key, 
classOf[DeltaSparkSessionExtension].getName)
+      .set(SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION.key, 
classOf[DeltaCatalog].getName)
+      .set("spark.databricks.delta.snapshotPartitions", "2")
+  }
+
+  test("extracts essential Delta DV scan info from split metadata") {
+    withTempDir {
+      tempDir =>
+        val path = tempDir.getCanonicalPath
+        Seq((1, "a"), (2, "b"), (3, "c"), (4, "d"))
+          .toDF("id", "value")
+          .coalesce(1)
+          .write
+          .format("delta")
+          .save(path)
+
+        spark.sql(
+          s"ALTER TABLE delta.`$path` SET TBLPROPERTIES 
('delta.enableDeletionVectors' = true)")
+        spark.sql(s"DELETE FROM delta.`$path` WHERE id IN (3, 4)")
+
+        val dataFile = DeltaLog
+          .forTable(spark, new Path(path))
+          .update()
+          .allFiles
+          .collect()
+          .find(_.deletionVector != null)
+          .get
+        val partitionedFile = partitionedFileWithMetadata(
+          path,
+          dataFile.path,
+          dataFile.size,
+          Map(
+            GlutenDeltaParquetFileFormat.FILE_ROW_INDEX_FILTER_ID_ENCODED ->
+              dataFile.deletionVector.serializeToBase64(),
+            GlutenDeltaParquetFileFormat.FILE_ROW_INDEX_FILTER_TYPE -> 
"IF_CONTAINED",
+            "kept_key" -> "kept_value"
+          )
+        )
+
+        val scanInfo = DeltaDeletionVectorScanInfo.extract(spark, 0, 
partitionedFile)
+        val dvInfo = scanInfo.deletionVectorInfo
+
+        assert(dvInfo.hasDeletionVector)
+        assert(dvInfo.rowIndexFilterType == RowIndexFilterType.IF_CONTAINED)
+        assert(dvInfo.cardinality == dataFile.deletionVector.cardinality)
+        assert(dvInfo.serializedDeletionVector.nonEmpty)
+        assert(scanInfo.normalizedOtherMetadataColumns == Map("kept_key" -> 
"kept_value"))
+    }
+  }
+
+  test("returns keep-all scan info when Delta DV metadata is absent") {
+    withTempDir {
+      tempDir =>
+        val path = tempDir.getCanonicalPath
+        Seq((1, "a")).toDF("id", 
"value").coalesce(1).write.format("delta").save(path)
+
+        val dataFile = DeltaLog.forTable(spark, new 
Path(path)).update().allFiles.collect().head
+        val partitionedFile = partitionedFileWithMetadata(
+          path,
+          dataFile.path,
+          dataFile.size,
+          Map("kept_key" -> "kept_value"))
+
+        val scanInfo = DeltaDeletionVectorScanInfo.extract(spark, 0, 
partitionedFile)
+        val dvInfo = scanInfo.deletionVectorInfo
+
+        assert(!dvInfo.hasDeletionVector)
+        assert(dvInfo.rowIndexFilterType == RowIndexFilterType.KEEP_ALL)
+        assert(dvInfo.cardinality == 0L)
+        assert(dvInfo.serializedDeletionVector.isEmpty)
+        assert(scanInfo.normalizedOtherMetadataColumns == Map("kept_key" -> 
"kept_value"))
+    }
+  }
+
+  test("rejects partial Delta DV split metadata") {
+    withTempDir {
+      tempDir =>
+        val path = tempDir.getCanonicalPath
+        Seq((1, "a")).toDF("id", 
"value").coalesce(1).write.format("delta").save(path)
+
+        val dataFile = DeltaLog.forTable(spark, new 
Path(path)).update().allFiles.collect().head
+        val partitionedFile = partitionedFileWithMetadata(
+          path,
+          dataFile.path,
+          dataFile.size,
+          Map(GlutenDeltaParquetFileFormat.FILE_ROW_INDEX_FILTER_TYPE -> 
"IF_CONTAINED"))
+
+        val error = intercept[IllegalStateException] {
+          DeltaDeletionVectorScanInfo.extract(spark, 0, partitionedFile)
+        }
+        assert(error.getMessage.contains("must either be present or absent"))
+    }
+  }
+
+  private def partitionedFileWithMetadata(
+      tablePath: String,
+      relativeFilePath: String,
+      fileSize: Long,
+      metadata: Map[String, Object]): PartitionedFile = {
+    PartitionedFile(
+      partitionValues = InternalRow.empty,
+      filePath = SparkPath.fromPath(new Path(tablePath, relativeFilePath)),
+      start = 0L,
+      length = fileSize,
+      fileSize = fileSize,
+      otherConstantMetadataColumnValues = metadata
+    )
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to