This is an automated email from the ASF dual-hosted git repository.

zhztheplayer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new f75b869131 [VL][Delta] Add JVM Delta DV scan handoff (#12269)
f75b869131 is described below

commit f75b869131e957701145b19bc859c47272b8de74
Author: Mohammad Linjawi <[email protected]>
AuthorDate: Fri Jun 12 16:47:51 2026 +0300

    [VL][Delta] Add JVM Delta DV scan handoff (#12269)
---
 .../gluten/component/VeloxDeltaComponent.scala     |   4 +
 .../delta/DeltaDeletionVectorHandoffSuite.scala    |  68 ++++++++
 .../org/apache/spark/sql/delta/DeltaSuite.scala    |  28 +++
 .../delta/DeltaDeletionVectorHandoffSuite.scala    |  98 +++++++++++
 cpp/velox/compute/VeloxPlanConverter.cc            |  94 ++++++++++
 cpp/velox/compute/WholeStageResultIterator.cc      |  69 +++++++-
 cpp/velox/compute/delta/DeltaSplitInfo.h           |  46 +++++
 cpp/velox/substrait/SubstraitToVeloxPlan.cc        |  13 +-
 .../delta/DeltaDeletionVectorScanInfo.scala}       |  21 +--
 .../delta/DeltaDeletionVectorScanInfo.scala}       |  21 +--
 .../delta/DeltaDeletionVectorScanInfo.scala}       |  21 +--
 .../gluten/delta/DeltaDeletionVectorScanInfo.scala |  57 ++++++-
 .../gluten/delta/DeltaDeletionVectorScanInfo.scala |  57 ++++++-
 .../gluten/execution/DeltaScanTransformer.scala    |  40 +++--
 .../gluten/extension/DeltaPostTransformRules.scala | 190 ++++++++++++++++++++-
 .../apache/gluten/extension/OffloadDeltaScan.scala |  79 ++++++++-
 .../org/apache/gluten/execution/DeltaSuite.scala   |  22 ++-
 .../substrait/rel/DeltaLocalFilesBuilder.java      |  27 +--
 .../gluten/substrait/rel/DeltaLocalFilesNode.java  | 117 +++++++++++++
 .../gluten/substrait/rel/LocalFilesNode.java       |  21 +++
 .../substrait/proto/substrait/algebra.proto        |  13 ++
 21 files changed, 1013 insertions(+), 93 deletions(-)

diff --git 
a/backends-velox/src-delta/main/scala/org/apache/gluten/component/VeloxDeltaComponent.scala
 
b/backends-velox/src-delta/main/scala/org/apache/gluten/component/VeloxDeltaComponent.scala
index 0587e8b07f..164cf52886 100644
--- 
a/backends-velox/src-delta/main/scala/org/apache/gluten/component/VeloxDeltaComponent.scala
+++ 
b/backends-velox/src-delta/main/scala/org/apache/gluten/component/VeloxDeltaComponent.scala
@@ -36,6 +36,10 @@ class VeloxDeltaComponent extends Component {
 
   override def injectRules(injector: Injector): Unit = {
     val legacy = injector.gluten.legacy
+    // Deletion-vector scans need no Gluten-side logical preprocessing: 
Delta's own
+    // PreprocessTableWithDVsStrategy injects the skip-row column and filter 
during physical
+    // planning, DeltaPostTransformRules.nativeDeletionVectorRule strips them 
when the scan
+    // offloads, and DeltaScanTransformer materializes the per-file DV 
payloads for Velox.
     legacy.injectTransform {
       c =>
         val offload = Seq(OffloadDeltaScan(), OffloadDeltaProject(), 
OffloadDeltaFilter())
diff --git 
a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaDeletionVectorHandoffSuite.scala
 
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaDeletionVectorHandoffSuite.scala
new file mode 100644
index 0000000000..f5510a9525
--- /dev/null
+++ 
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaDeletionVectorHandoffSuite.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.delta
+
+import org.apache.gluten.execution.DeltaScanTransformer
+
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.delta.test.{DeltaSQLCommandTest, DeltaSQLTestUtils}
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.tags.ExtendedSQLTest
+
+import org.apache.hadoop.fs.Path
+
+@ExtendedSQLTest
+class DeltaDeletionVectorHandoffSuite
+  extends QueryTest
+  with SharedSparkSession
+  with DeltaSQLTestUtils
+  with DeltaSQLCommandTest {
+
+  import testImplicits._
+
+  test("Spark 3.5 Delta DV scan handoff should filter deleted rows") {
+    withTempDir {
+      tempDir =>
+        val path = tempDir.getCanonicalPath
+        Seq((1, "a"), (2, "b"), (3, "c"), (4, "d"))
+          .toDF("id", "value")
+          .coalesce(1)
+          .write
+          .format("delta")
+          .save(path)
+
+        spark.sql(
+          s"ALTER TABLE delta.`$path` SET TBLPROPERTIES 
('delta.enableDeletionVectors' = true)")
+        spark.sql(s"DELETE FROM delta.`$path` WHERE id IN (3, 4)")
+
+        val log = DeltaLog.forTable(spark, new Path(path))
+        val addFileWithDv = 
log.update().allFiles.collect().find(_.deletionVector != null)
+        assert(addFileWithDv.nonEmpty)
+
+        val dataFile = addFileWithDv.get
+        assert(dataFile.deletionVector.cardinality == 2L)
+
+        val df = spark.read.format("delta").load(path)
+        val executedPlan = df.queryExecution.executedPlan
+        assert(executedPlan.collect { case _: DeltaScanTransformer => true 
}.nonEmpty)
+        val planText = executedPlan.toString()
+        assert(!planText.contains("__delta_internal_is_row_deleted"))
+        assert(!planText.contains("__delta_internal_row_index"))
+        checkAnswer(df, Seq((1, "a"), (2, "b")).toDF())
+    }
+  }
+}
diff --git 
a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaSuite.scala
 
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaSuite.scala
index f265168ddb..a60054031c 100644
--- 
a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaSuite.scala
+++ 
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaSuite.scala
@@ -197,6 +197,34 @@ class DeltaSuite
     checkAnswer(data.toDF(), Row(1) :: Row(2) :: Row(3) :: Row(4) :: Row(5) :: 
Row(6) :: Nil)
   }
 
+  test("DV scan without metadata row index falls back and stays correct") {
+    withTempDir {
+      tempDir =>
+        val path = tempDir.getCanonicalPath
+        Seq((1, "a"), (2, "b"), (3, "c"), (4, "d"))
+          .toDF("id", "value")
+          .coalesce(1)
+          .write
+          .format("delta")
+          .save(path)
+
+        spark.sql(
+          s"ALTER TABLE delta.`$path` SET TBLPROPERTIES 
('delta.enableDeletionVectors' = true)")
+
+        withSQLConf(DeltaSQLConf.DELETION_VECTORS_USE_METADATA_ROW_INDEX.key 
-> "false") {
+          spark.sql(s"DELETE FROM delta.`$path` WHERE id IN (3, 4)")
+
+          val log = DeltaLog.forTable(spark, new Path(path))
+          assert(log.update().allFiles.collect().exists(_.deletionVector != 
null))
+
+          val df = spark.read.format("delta").load(path)
+          val executedPlan = df.queryExecution.executedPlan
+          assert(executedPlan.collect { case _: DeltaScanTransformer => true 
}.isEmpty)
+          checkAnswer(df, Seq(Row(1, "a"), Row(2, "b")))
+        }
+    }
+  }
+
   test("partitioned append - nulls") {
     val tempDir = Utils.createTempDir()
     Seq(Some(1), None)
diff --git 
a/backends-velox/src-delta40/test/scala/org/apache/spark/sql/delta/DeltaDeletionVectorHandoffSuite.scala
 
b/backends-velox/src-delta40/test/scala/org/apache/spark/sql/delta/DeltaDeletionVectorHandoffSuite.scala
new file mode 100644
index 0000000000..dda547b015
--- /dev/null
+++ 
b/backends-velox/src-delta40/test/scala/org/apache/spark/sql/delta/DeltaDeletionVectorHandoffSuite.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.delta
+
+import org.apache.gluten.execution.DeltaScanTransformer
+
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.delta.sources.DeltaSQLConf
+import org.apache.spark.sql.delta.test.{DeltaSQLCommandTest, DeltaSQLTestUtils}
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.tags.ExtendedSQLTest
+
+import org.apache.hadoop.fs.Path
+
+@ExtendedSQLTest
+class DeltaDeletionVectorHandoffSuite
+  extends QueryTest
+  with SharedSparkSession
+  with DeltaSQLTestUtils
+  with DeltaSQLCommandTest {
+
+  import testImplicits._
+
+  test("Spark 4 Delta DV scan should fall back when metadata row index is 
disabled") {
+    withTempDir {
+      tempDir =>
+        val path = tempDir.getCanonicalPath
+        Seq((1, "a"), (2, "b"), (3, "c"), (4, "d"))
+          .toDF("id", "value")
+          .coalesce(1)
+          .write
+          .format("delta")
+          .save(path)
+
+        spark.sql(
+          s"ALTER TABLE delta.`$path` SET TBLPROPERTIES 
('delta.enableDeletionVectors' = true)")
+        spark.sql(s"DELETE FROM delta.`$path` WHERE id IN (3, 4)")
+
+        val log = DeltaLog.forTable(spark, new Path(path))
+        assert(log.update().allFiles.collect().exists(_.deletionVector != 
null))
+
+        // This covers scan behavior over an existing DV. Keep the 
no-metadata-row-index
+        // path on Spark until the native path can prove the same contract for 
DML DVs.
+        withSQLConf(DeltaSQLConf.DELETION_VECTORS_USE_METADATA_ROW_INDEX.key 
-> "false") {
+          val df = spark.read.format("delta").load(path)
+          val executedPlan = df.queryExecution.executedPlan
+          assert(executedPlan.collect { case _: DeltaScanTransformer => true 
}.isEmpty)
+          checkAnswer(df, Seq((1, "a"), (2, "b")).toDF())
+        }
+    }
+  }
+
+  test("Spark 4 Delta DV scan handoff should filter deleted rows") {
+    withTempDir {
+      tempDir =>
+        val path = tempDir.getCanonicalPath
+        Seq((1, "a"), (2, "b"), (3, "c"), (4, "d"))
+          .toDF("id", "value")
+          .coalesce(1)
+          .write
+          .format("delta")
+          .save(path)
+
+        spark.sql(
+          s"ALTER TABLE delta.`$path` SET TBLPROPERTIES 
('delta.enableDeletionVectors' = true)")
+        spark.sql(s"DELETE FROM delta.`$path` WHERE id IN (3, 4)")
+
+        val log = DeltaLog.forTable(spark, new Path(path))
+        val addFileWithDv = 
log.update().allFiles.collect().find(_.deletionVector != null)
+        assert(addFileWithDv.nonEmpty)
+
+        val dataFile = addFileWithDv.get
+        assert(dataFile.deletionVector.cardinality == 2L)
+
+        val df = spark.read.format("delta").load(path)
+        val executedPlan = df.queryExecution.executedPlan
+        assert(executedPlan.collect { case _: DeltaScanTransformer => true 
}.nonEmpty)
+        val planText = executedPlan.toString()
+        assert(!planText.contains("__delta_internal_is_row_deleted"))
+        assert(!planText.contains("__delta_internal_row_index"))
+        checkAnswer(df, Seq((1, "a"), (2, "b")).toDF())
+    }
+  }
+}
diff --git a/cpp/velox/compute/VeloxPlanConverter.cc 
b/cpp/velox/compute/VeloxPlanConverter.cc
index f3ffab59a6..f47ca61050 100644
--- a/cpp/velox/compute/VeloxPlanConverter.cc
+++ b/cpp/velox/compute/VeloxPlanConverter.cc
@@ -17,8 +17,13 @@
 
 #include "VeloxPlanConverter.h"
 #include <filesystem>
+#include <limits>
+#include <optional>
 
+#include <google/protobuf/any.pb.h>
+#include <google/protobuf/wrappers.pb.h>
 #include "config/GlutenConfig.h"
+#include "delta/DeltaSplitInfo.h"
 #include "iceberg/IcebergPlanConverter.h"
 #include "operators/plannodes/IteratorSplit.h"
 
@@ -48,6 +53,87 @@ VeloxPlanConverter::VeloxPlanConverter(
 }
 
 namespace {
+std::optional<std::string> unpackMetadataValue(const google::protobuf::Any& 
value) {
+  google::protobuf::BytesValue bytesValue;
+  if (value.UnpackTo(&bytesValue)) {
+    return bytesValue.value();
+  }
+
+  google::protobuf::StringValue stringValue;
+  if (value.UnpackTo(&stringValue)) {
+    return stringValue.value();
+  }
+
+  google::protobuf::Int32Value int32Value;
+  if (value.UnpackTo(&int32Value)) {
+    return std::to_string(int32Value.value());
+  }
+
+  google::protobuf::Int64Value int64Value;
+  if (value.UnpackTo(&int64Value)) {
+    return std::to_string(int64Value.value());
+  }
+
+  google::protobuf::DoubleValue doubleValue;
+  if (value.UnpackTo(&doubleValue)) {
+    return std::to_string(doubleValue.value());
+  }
+
+  // Matches the string encoding the JVM side uses for booleans, which are 
packed through
+  // SubstraitUtil.convertJavaObjectToAny's toString fallback rather than as 
BoolValue.
+  google::protobuf::BoolValue boolValue;
+  if (value.UnpackTo(&boolValue)) {
+    return boolValue.value() ? "true" : "false";
+  }
+
+  return std::nullopt;
+}
+
+delta::DeltaRowIndexFilterType parseDeltaRowIndexFilterType(int filterType) {
+  switch (filterType) {
+    case 1:
+      return delta::DeltaRowIndexFilterType::kIfContained;
+    case 2:
+      return delta::DeltaRowIndexFilterType::kIfNotContained;
+    case 0:
+    default:
+      return delta::DeltaRowIndexFilterType::kKeepAll;
+  }
+}
+
+std::shared_ptr<DeltaSplitInfo> parseDeltaSplitInfo(
+    const substrait::ReadRel_LocalFiles_FileOrFiles& file,
+    std::shared_ptr<SplitInfo> splitInfo) {
+  auto deltaSplitInfo = std::dynamic_pointer_cast<DeltaSplitInfo>(splitInfo)
+      ? std::dynamic_pointer_cast<DeltaSplitInfo>(splitInfo)
+      : std::make_shared<DeltaSplitInfo>(*splitInfo);
+
+  deltaSplitInfo->format = dwio::common::FileFormat::PARQUET;
+  const auto& deltaReadOptions = file.delta();
+  deltaSplitInfo->rowIndexFilterTypes.emplace_back(
+      parseDeltaRowIndexFilterType(deltaReadOptions.row_index_filter_type()));
+
+  if (!deltaReadOptions.has_deletion_vector()) {
+    deltaSplitInfo->deletionVectors.emplace_back(std::nullopt);
+    return deltaSplitInfo;
+  }
+
+  auto serializedPayload = deltaReadOptions.serialized_deletion_vector();
+  VELOX_USER_CHECK(!serializedPayload.empty(), "Delta split has a deletion 
vector without a serialized payload");
+  VELOX_USER_CHECK_LE(
+      serializedPayload.size(),
+      static_cast<size_t>(std::numeric_limits<int32_t>::max()),
+      "Delta deletion vector serialized payload is too large");
+  const auto cardinality = 
static_cast<uint64_t>(deltaReadOptions.deletion_vector_cardinality());
+  auto payload = std::make_shared<std::string>(std::move(serializedPayload));
+  const SplitPayloadBufferView payloadView{
+      reinterpret_cast<const uint8_t*>(payload->data()), 
static_cast<int32_t>(payload->size())};
+  deltaSplitInfo->deletionVectors.emplace_back(
+      delta::DeltaDeletionVectorDescriptor::serialized(cardinality, 
payloadView));
+  deltaSplitInfo->deletionVectorPayloads.emplace_back(std::move(payload));
+  return deltaSplitInfo;
+}
+
 std::shared_ptr<SplitInfo> parseScanSplitInfo(
     const facebook::velox::config::ConfigBase* veloxCfg,
     const 
google::protobuf::RepeatedPtrField<substrait::ReadRel_LocalFiles_FileOrFiles>& 
fileList) {
@@ -75,6 +161,11 @@ std::shared_ptr<SplitInfo> parseScanSplitInfo(
     for (const auto& metadataColumn : file.metadata_columns()) {
       metadataColumnMap[metadataColumn.key()] = metadataColumn.value();
     }
+    for (const auto& otherMetadataColumn : 
file.other_const_metadata_columns()) {
+      if (auto unpackedValue = 
unpackMetadataValue(otherMetadataColumn.value())) {
+        metadataColumnMap[otherMetadataColumn.key()] = 
std::move(*unpackedValue);
+      }
+    }
     splitInfo->metadataColumns.emplace_back(metadataColumnMap);
 
     splitInfo->paths.emplace_back(file.uri_file());
@@ -103,6 +194,9 @@ std::shared_ptr<SplitInfo> parseScanSplitInfo(
       case SubstraitFileFormatCase::kIceberg:
         splitInfo = IcebergPlanConverter::parseIcebergSplitInfo(file, 
std::move(splitInfo));
         break;
+      case SubstraitFileFormatCase::kDelta:
+        splitInfo = parseDeltaSplitInfo(file, std::move(splitInfo));
+        break;
       default:
         splitInfo->format = dwio::common::FileFormat::UNKNOWN;
         break;
diff --git a/cpp/velox/compute/WholeStageResultIterator.cc 
b/cpp/velox/compute/WholeStageResultIterator.cc
index c3ac095cdc..753ccb2a68 100644
--- a/cpp/velox/compute/WholeStageResultIterator.cc
+++ b/cpp/velox/compute/WholeStageResultIterator.cc
@@ -15,9 +15,13 @@
  * limitations under the License.
  */
 #include "WholeStageResultIterator.h"
+#include <optional>
 #include "VeloxBackend.h"
 #include "VeloxPlanConverter.h"
 #include "VeloxRuntime.h"
+#include "compute/delta/DeltaConnector.h"
+#include "compute/delta/DeltaSplit.h"
+#include "compute/delta/DeltaSplitInfo.h"
 #include "config/VeloxConfig.h"
 #include "utils/ConfigExtractor.h"
 #include "velox/connectors/hive/HiveConfig.h"
@@ -69,6 +73,36 @@ const std::string kWriteIOTime = "writeIOWallNanos";
 
 // others
 const std::string kHiveDefaultPartition = "__HIVE_DEFAULT_PARTITION__";
+const std::string kDeltaTableFormat = "delta";
+
+const velox::core::TableScanNode* findTableScanNodeById(
+    const std::shared_ptr<const velox::core::PlanNode>& planNode,
+    const velox::core::PlanNodeId& nodeId) {
+  if (planNode == nullptr) {
+    return nullptr;
+  }
+
+  if (planNode->id() == nodeId) {
+    return dynamic_cast<const velox::core::TableScanNode*>(planNode.get());
+  }
+
+  for (const auto& source : planNode->sources()) {
+    if (const auto* found = findTableScanNodeById(source, nodeId)) {
+      return found;
+    }
+  }
+  return nullptr;
+}
+
+std::string connectorIdForScanNode(
+    const std::shared_ptr<const velox::core::PlanNode>& planNode,
+    const velox::core::PlanNodeId& nodeId) {
+  const auto* tableScanNode = findTableScanNodeById(planNode, nodeId);
+  if (tableScanNode == nullptr) {
+    return "";
+  }
+  return tableScanNode->tableHandle()->connectorId();
+}
 
 } // namespace
 
@@ -134,7 +168,8 @@ WholeStageResultIterator::WholeStageResultIterator(
     throw std::runtime_error("Invalid scan information.");
   }
 
-  for (const auto& scanInfo : scanInfos) {
+  for (size_t scanInfoIdx = 0; scanInfoIdx < scanInfos.size(); ++scanInfoIdx) {
+    const auto& scanInfo = scanInfos[scanInfoIdx];
     // Get the information for TableScan.
     // Partition index in scan info is not used.
     const auto& paths = scanInfo->paths;
@@ -144,6 +179,9 @@ WholeStageResultIterator::WholeStageResultIterator(
     const auto& format = scanInfo->format;
     const auto& partitionColumns = scanInfo->partitionColumns;
     const auto& metadataColumns = scanInfo->metadataColumns;
+    const auto scanNodeConnectorId = connectorIdForScanNode(veloxPlan_, 
scanNodeIds_[scanInfoIdx]);
+    const auto deltaSplitInfo = 
std::dynamic_pointer_cast<DeltaSplitInfo>(scanInfo);
+    const bool isDeltaScan = scanNodeConnectorId == connectorIds_.delta || 
deltaSplitInfo != nullptr;
 #ifdef GLUTEN_ENABLE_GPU
     // Under the pre-condition that all the split infos has same partition 
column and format.
     const auto canUseCudfConnector = scanInfo->canUseCudfConnector();
@@ -177,10 +215,37 @@ WholeStageResultIterator::WholeStageResultIterator(
             deleteFiles,
             metadataColumn,
             properties[idx]);
+      } else if (isDeltaScan) {
+        std::unordered_map<std::string, std::string> 
customSplitInfo{{"table_format", kDeltaTableFormat}};
+        std::optional<gluten::delta::DeltaDeletionVectorDescriptor> 
deletionVector = std::nullopt;
+        auto rowIndexFilterType = 
gluten::delta::DeltaRowIndexFilterType::kKeepAll;
+        if (deltaSplitInfo != nullptr) {
+          VELOX_USER_CHECK_LT(idx, deltaSplitInfo->deletionVectors.size());
+          VELOX_USER_CHECK_LT(idx, deltaSplitInfo->rowIndexFilterTypes.size());
+          deletionVector = deltaSplitInfo->deletionVectors[idx];
+          rowIndexFilterType = deltaSplitInfo->rowIndexFilterTypes[idx];
+        }
+        split = std::make_shared<gluten::delta::HiveDeltaSplit>(
+            connectorIds_.delta,
+            paths[idx],
+            format,
+            starts[idx],
+            lengths[idx],
+            partitionKeys,
+            std::nullopt,
+            customSplitInfo,
+            nullptr,
+            std::unordered_map<std::string, std::string>(),
+            true,
+            deletionVector,
+            std::nullopt,
+            rowIndexFilterType,
+            metadataColumn,
+            properties[idx]);
       } else {
         auto connectorId = connectorIds_.hive;
 #ifdef GLUTEN_ENABLE_GPU
-        if (canUseCudfConnector && enableCudf_ &&
+        if (connectorId == connectorIds_.hive && canUseCudfConnector && 
enableCudf_ &&
             veloxCfg_->get<bool>(kCudfEnableTableScan, 
kCudfEnableTableScanDefault)) {
           connectorId = connectorIds_.cudfHive;
         }
diff --git a/cpp/velox/compute/delta/DeltaSplitInfo.h 
b/cpp/velox/compute/delta/DeltaSplitInfo.h
new file mode 100644
index 0000000000..c02e52f6b8
--- /dev/null
+++ b/cpp/velox/compute/delta/DeltaSplitInfo.h
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <optional>
+#include <string>
+#include <vector>
+
+#include "compute/delta/DeltaSplit.h"
+#include "substrait/SubstraitToVeloxPlan.h"
+
+namespace gluten {
+
+struct DeltaSplitInfo : SplitInfo {
+  std::vector<std::shared_ptr<std::string>> deletionVectorPayloads;
+  std::vector<std::optional<delta::DeltaDeletionVectorDescriptor>> 
deletionVectors;
+  std::vector<delta::DeltaRowIndexFilterType> rowIndexFilterTypes;
+
+  DeltaSplitInfo(const SplitInfo& splitInfo) : SplitInfo(splitInfo) {
+    deletionVectors.reserve(splitInfo.paths.capacity());
+    deletionVectorPayloads.reserve(splitInfo.paths.capacity());
+    rowIndexFilterTypes.reserve(splitInfo.paths.capacity());
+
+    const auto previousFileCount = splitInfo.paths.empty() ? 0 : 
splitInfo.paths.size() - 1;
+    deletionVectors.resize(previousFileCount, std::nullopt);
+    rowIndexFilterTypes.resize(previousFileCount, 
delta::DeltaRowIndexFilterType::kKeepAll);
+  }
+};
+
+} // namespace gluten
diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc 
b/cpp/velox/substrait/SubstraitToVeloxPlan.cc
index 73eef45245..2d7e65734b 100644
--- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc
+++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc
@@ -19,6 +19,8 @@
 
 #include "TypeUtils.h"
 #include "VariantToVectorConverter.h"
+#include "compute/delta/DeltaConnector.h"
+#include "compute/delta/DeltaSplitInfo.h"
 #include "jni/JniHashTable.h"
 #include "operators/hashjoin/HashTableBuilder.h"
 #include "operators/plannodes/RowVectorStream.h"
@@ -58,6 +60,12 @@ bool useCudfTableHandle(const 
std::vector<std::shared_ptr<SplitInfo>>& splitInfo
 #endif
 }
 
+// Delta scans are recognized structurally: parsing the substrait delta file 
format case yields a
+// typed DeltaSplitInfo.
+bool isDeltaSplitInfo(const std::shared_ptr<SplitInfo>& splitInfo) {
+  return std::dynamic_pointer_cast<DeltaSplitInfo>(splitInfo) != nullptr;
+}
+
 core::SortOrder toSortOrder(const ::substrait::SortField& sortField) {
   switch (sortField.direction()) {
     case ::substrait::SortField_SortDirection_SORT_DIRECTION_ASC_NULLS_FIRST:
@@ -1575,8 +1583,9 @@ core::PlanNodePtr 
SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
 
   connector::ConnectorTableHandlePtr tableHandle;
   auto remainingFilter = readRel.has_filter() ? 
exprConverter_->toVeloxExpr(readRel.filter(), baseSchema) : nullptr;
-  auto connectorId = connectorIds_.hive;
-  if (useCudfTableHandle(splitInfos_) && 
veloxCfg_->get<bool>(kCudfEnableTableScan, kCudfEnableTableScanDefault) &&
+  auto connectorId = isDeltaSplitInfo(splitInfo) ? connectorIds_.delta : 
connectorIds_.hive;
+  if (connectorId == connectorIds_.hive && useCudfTableHandle(splitInfos_) &&
+      veloxCfg_->get<bool>(kCudfEnableTableScan, kCudfEnableTableScanDefault) 
&&
       veloxCfg_->get<bool>(kCudfEnabled, kCudfEnabledDefault)) {
 #ifdef GLUTEN_ENABLE_GPU
     connectorId = connectorIds_.cudfHive;
diff --git 
a/gluten-delta/src/main/scala/org/apache/gluten/extension/OffloadDeltaScan.scala
 
b/gluten-delta/src-delta20/main/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfo.scala
similarity index 57%
copy from 
gluten-delta/src/main/scala/org/apache/gluten/extension/OffloadDeltaScan.scala
copy to 
gluten-delta/src-delta20/main/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfo.scala
index 5fe1b4ba86..af8e8df7da 100644
--- 
a/gluten-delta/src/main/scala/org/apache/gluten/extension/OffloadDeltaScan.scala
+++ 
b/gluten-delta/src-delta20/main/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfo.scala
@@ -14,19 +14,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.gluten.extension
+package org.apache.gluten.delta
 
-import org.apache.gluten.execution.DeltaScanTransformer
-import org.apache.gluten.extension.columnar.offload.OffloadSingleNode
+import org.apache.gluten.substrait.rel.DeltaLocalFilesNode.DeltaFileReadOptions
 
-import org.apache.spark.sql.delta.DeltaParquetFileFormat
-import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan}
+import org.apache.spark.sql.execution.datasources.PartitionedFile
 
-case class OffloadDeltaScan() extends OffloadSingleNode {
-  override def offload(plan: SparkPlan): SparkPlan = plan match {
-    case scan: FileSourceScanExec
-        if scan.relation.fileFormat.getClass == 
classOf[DeltaParquetFileFormat] =>
-      DeltaScanTransformer(scan)
-    case other => other
-  }
+import java.util.{Map => JMap}
+
+/** Reading deletion vectors natively requires Delta 3.3+, so there is nothing 
to materialize. */
+object DeltaDeletionVectorScanInfo {
+  def normalize(partitionColumnCount: Int, partitionFiles: 
Seq[PartitionedFile])
+      : Option[(Seq[JMap[String, Object]], Seq[DeltaFileReadOptions])] = None
 }
diff --git 
a/gluten-delta/src/main/scala/org/apache/gluten/extension/OffloadDeltaScan.scala
 
b/gluten-delta/src-delta23/main/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfo.scala
similarity index 57%
copy from 
gluten-delta/src/main/scala/org/apache/gluten/extension/OffloadDeltaScan.scala
copy to 
gluten-delta/src-delta23/main/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfo.scala
index 5fe1b4ba86..af8e8df7da 100644
--- 
a/gluten-delta/src/main/scala/org/apache/gluten/extension/OffloadDeltaScan.scala
+++ 
b/gluten-delta/src-delta23/main/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfo.scala
@@ -14,19 +14,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.gluten.extension
+package org.apache.gluten.delta
 
-import org.apache.gluten.execution.DeltaScanTransformer
-import org.apache.gluten.extension.columnar.offload.OffloadSingleNode
+import org.apache.gluten.substrait.rel.DeltaLocalFilesNode.DeltaFileReadOptions
 
-import org.apache.spark.sql.delta.DeltaParquetFileFormat
-import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan}
+import org.apache.spark.sql.execution.datasources.PartitionedFile
 
-case class OffloadDeltaScan() extends OffloadSingleNode {
-  override def offload(plan: SparkPlan): SparkPlan = plan match {
-    case scan: FileSourceScanExec
-        if scan.relation.fileFormat.getClass == 
classOf[DeltaParquetFileFormat] =>
-      DeltaScanTransformer(scan)
-    case other => other
-  }
+import java.util.{Map => JMap}
+
+/** Reading deletion vectors natively requires Delta 3.3+, so there is nothing 
to materialize. */
+object DeltaDeletionVectorScanInfo {
+  def normalize(partitionColumnCount: Int, partitionFiles: 
Seq[PartitionedFile])
+      : Option[(Seq[JMap[String, Object]], Seq[DeltaFileReadOptions])] = None
 }
diff --git 
a/gluten-delta/src/main/scala/org/apache/gluten/extension/OffloadDeltaScan.scala
 
b/gluten-delta/src-delta24/main/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfo.scala
similarity index 57%
copy from 
gluten-delta/src/main/scala/org/apache/gluten/extension/OffloadDeltaScan.scala
copy to 
gluten-delta/src-delta24/main/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfo.scala
index 5fe1b4ba86..af8e8df7da 100644
--- 
a/gluten-delta/src/main/scala/org/apache/gluten/extension/OffloadDeltaScan.scala
+++ 
b/gluten-delta/src-delta24/main/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfo.scala
@@ -14,19 +14,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.gluten.extension
+package org.apache.gluten.delta
 
-import org.apache.gluten.execution.DeltaScanTransformer
-import org.apache.gluten.extension.columnar.offload.OffloadSingleNode
+import org.apache.gluten.substrait.rel.DeltaLocalFilesNode.DeltaFileReadOptions
 
-import org.apache.spark.sql.delta.DeltaParquetFileFormat
-import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan}
+import org.apache.spark.sql.execution.datasources.PartitionedFile
 
-case class OffloadDeltaScan() extends OffloadSingleNode {
-  override def offload(plan: SparkPlan): SparkPlan = plan match {
-    case scan: FileSourceScanExec
-        if scan.relation.fileFormat.getClass == 
classOf[DeltaParquetFileFormat] =>
-      DeltaScanTransformer(scan)
-    case other => other
-  }
+import java.util.{Map => JMap}
+
+/** Reading deletion vectors natively requires Delta 3.3+, so there is nothing 
to materialize. */
+object DeltaDeletionVectorScanInfo {
+  def normalize(partitionColumnCount: Int, partitionFiles: 
Seq[PartitionedFile])
+      : Option[(Seq[JMap[String, Object]], Seq[DeltaFileReadOptions])] = None
 }
diff --git 
a/backends-velox/src-delta33/main/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfo.scala
 
b/gluten-delta/src-delta33/main/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfo.scala
similarity index 76%
rename from 
backends-velox/src-delta33/main/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfo.scala
rename to 
gluten-delta/src-delta33/main/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfo.scala
index 8ada3d755c..263d0ffb35 100644
--- 
a/backends-velox/src-delta33/main/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfo.scala
+++ 
b/gluten-delta/src-delta33/main/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfo.scala
@@ -17,9 +17,11 @@
 package org.apache.gluten.delta
 
 import org.apache.gluten.sql.shims.SparkShimLoader
+import org.apache.gluten.substrait.rel.DeltaLocalFilesNode
+import org.apache.gluten.substrait.rel.DeltaLocalFilesNode.DeltaFileReadOptions
 
 import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.delta.GlutenDeltaParquetFileFormat
+import org.apache.spark.sql.delta.DeltaParquetFileFormat
 import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor
 import org.apache.spark.sql.delta.deletionvectors.{RoaringBitmapArrayFormat, 
StoredBitmap}
 import org.apache.spark.sql.delta.storage.dv.HadoopFileSystemDVStore
@@ -27,7 +29,7 @@ import 
org.apache.spark.sql.execution.datasources.PartitionedFile
 
 import org.apache.hadoop.fs.Path
 
-import java.util.{ArrayList => JArrayList}
+import java.util.{Map => JMap}
 
 import scala.collection.JavaConverters._
 import scala.util.control.NonFatal
@@ -51,9 +53,27 @@ object DeltaDeletionVectorScanInfo {
       deletionVectorInfo: DeletionVectorInfo)
 
   private val RowIndexFilterIdEncoded =
-    GlutenDeltaParquetFileFormat.FILE_ROW_INDEX_FILTER_ID_ENCODED
+    DeltaParquetFileFormat.FILE_ROW_INDEX_FILTER_ID_ENCODED
   private val RowIndexFilterTypeKey =
-    GlutenDeltaParquetFileFormat.FILE_ROW_INDEX_FILTER_TYPE
+    DeltaParquetFileFormat.FILE_ROW_INDEX_FILTER_TYPE
+
+  /**
+   * Materializes per-file Delta DV read options for a split, alongside each 
file's metadata with
+   * the DV bookkeeping keys stripped. Returns None when no file in the split 
carries a deletion
+   * vector, so callers can keep the generic split representation.
+   */
+  def normalize(partitionColumnCount: Int, partitionFiles: 
Seq[PartitionedFile])
+      : Option[(Seq[JMap[String, Object]], Seq[DeltaFileReadOptions])] = {
+    val scanInfos = extractAll(activeSparkSession, partitionColumnCount, 
partitionFiles)
+    if (scanInfos.exists(_.deletionVectorInfo.hasDeletionVector)) {
+      Some(
+        (
+          scanInfos.map(_.normalizedOtherMetadataColumns.asJava),
+          scanInfos.map(info => 
toDeltaFileReadOptions(info.deletionVectorInfo))))
+    } else {
+      None
+    }
+  }
 
   def extract(
       spark: SparkSession,
@@ -72,11 +92,30 @@ object DeltaDeletionVectorScanInfo {
     files.map(extract(spark, partitionColumnCount, _))
   }
 
-  def extractAllFromJava(
-      spark: SparkSession,
-      partitionColumnCount: Int,
-      files: java.util.List[PartitionedFile]): 
java.util.List[PartitionFileScanInfo] = {
-    new JArrayList(extractAll(spark, partitionColumnCount, 
files.asScala.toSeq).asJava)
+  private def toDeltaFileReadOptions(dvInfo: DeletionVectorInfo): 
DeltaFileReadOptions = {
+    new DeltaFileReadOptions(
+      toSubstraitRowIndexFilterType(dvInfo.rowIndexFilterType),
+      dvInfo.hasDeletionVector,
+      dvInfo.cardinality,
+      dvInfo.serializedDeletionVector)
+  }
+
+  private def toSubstraitRowIndexFilterType(
+      filterType: RowIndexFilterType): DeltaLocalFilesNode.RowIndexFilterType 
= {
+    filterType match {
+      case IF_CONTAINED => DeltaLocalFilesNode.RowIndexFilterType.IF_CONTAINED
+      case IF_NOT_CONTAINED => 
DeltaLocalFilesNode.RowIndexFilterType.IF_NOT_CONTAINED
+      case _ => DeltaLocalFilesNode.RowIndexFilterType.KEEP_ALL
+    }
+  }
+
+  private def activeSparkSession: SparkSession = {
+    SparkSession.getActiveSession
+      .orElse(SparkSession.getDefaultSession)
+      .getOrElse {
+        throw new IllegalStateException(
+          "Active SparkSession is required to materialize Delta deletion 
vectors")
+      }
   }
 
   private def extractDeletionVectorInfo(
diff --git 
a/backends-velox/src-delta40/main/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfo.scala
 
b/gluten-delta/src-delta40/main/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfo.scala
similarity index 77%
rename from 
backends-velox/src-delta40/main/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfo.scala
rename to 
gluten-delta/src-delta40/main/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfo.scala
index 69a5dc3078..cddc8849fc 100644
--- 
a/backends-velox/src-delta40/main/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfo.scala
+++ 
b/gluten-delta/src-delta40/main/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfo.scala
@@ -17,9 +17,11 @@
 package org.apache.gluten.delta
 
 import org.apache.gluten.sql.shims.SparkShimLoader
+import org.apache.gluten.substrait.rel.DeltaLocalFilesNode
+import org.apache.gluten.substrait.rel.DeltaLocalFilesNode.DeltaFileReadOptions
 
 import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.delta.GlutenDeltaParquetFileFormat
+import org.apache.spark.sql.delta.DeltaParquetFileFormat
 import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor
 import org.apache.spark.sql.delta.deletionvectors.{RoaringBitmapArrayFormat, 
StoredBitmap}
 import org.apache.spark.sql.delta.storage.dv.HadoopFileSystemDVStore
@@ -27,7 +29,7 @@ import 
org.apache.spark.sql.execution.datasources.PartitionedFile
 
 import org.apache.hadoop.fs.Path
 
-import java.util.{ArrayList => JArrayList}
+import java.util.{Map => JMap}
 
 import scala.collection.JavaConverters._
 import scala.util.Try
@@ -52,9 +54,27 @@ object DeltaDeletionVectorScanInfo {
       deletionVectorInfo: DeletionVectorInfo)
 
   private val RowIndexFilterIdEncoded =
-    GlutenDeltaParquetFileFormat.FILE_ROW_INDEX_FILTER_ID_ENCODED
+    DeltaParquetFileFormat.FILE_ROW_INDEX_FILTER_ID_ENCODED
   private val RowIndexFilterTypeKey =
-    GlutenDeltaParquetFileFormat.FILE_ROW_INDEX_FILTER_TYPE
+    DeltaParquetFileFormat.FILE_ROW_INDEX_FILTER_TYPE
+
+  /**
+   * Materializes per-file Delta DV read options for a split, alongside each 
file's metadata with
+   * the DV bookkeeping keys stripped. Returns None when no file in the split 
carries a deletion
+   * vector, so callers can keep the generic split representation.
+   */
+  def normalize(partitionColumnCount: Int, partitionFiles: 
Seq[PartitionedFile])
+      : Option[(Seq[JMap[String, Object]], Seq[DeltaFileReadOptions])] = {
+    val scanInfos = extractAll(activeSparkSession, partitionColumnCount, 
partitionFiles)
+    if (scanInfos.exists(_.deletionVectorInfo.hasDeletionVector)) {
+      Some(
+        (
+          scanInfos.map(_.normalizedOtherMetadataColumns.asJava),
+          scanInfos.map(info => 
toDeltaFileReadOptions(info.deletionVectorInfo))))
+    } else {
+      None
+    }
+  }
 
   def extract(
       spark: SparkSession,
@@ -73,11 +93,30 @@ object DeltaDeletionVectorScanInfo {
     files.map(extract(spark, partitionColumnCount, _))
   }
 
-  def extractAllFromJava(
-      spark: SparkSession,
-      partitionColumnCount: Int,
-      files: java.util.List[PartitionedFile]): 
java.util.List[PartitionFileScanInfo] = {
-    new JArrayList(extractAll(spark, partitionColumnCount, 
files.asScala.toSeq).asJava)
+  private def toDeltaFileReadOptions(dvInfo: DeletionVectorInfo): 
DeltaFileReadOptions = {
+    new DeltaFileReadOptions(
+      toSubstraitRowIndexFilterType(dvInfo.rowIndexFilterType),
+      dvInfo.hasDeletionVector,
+      dvInfo.cardinality,
+      dvInfo.serializedDeletionVector)
+  }
+
+  private def toSubstraitRowIndexFilterType(
+      filterType: RowIndexFilterType): DeltaLocalFilesNode.RowIndexFilterType 
= {
+    filterType match {
+      case IF_CONTAINED => DeltaLocalFilesNode.RowIndexFilterType.IF_CONTAINED
+      case IF_NOT_CONTAINED => 
DeltaLocalFilesNode.RowIndexFilterType.IF_NOT_CONTAINED
+      case _ => DeltaLocalFilesNode.RowIndexFilterType.KEEP_ALL
+    }
+  }
+
+  private def activeSparkSession: SparkSession = {
+    SparkSession.getActiveSession
+      .orElse(SparkSession.getDefaultSession)
+      .getOrElse {
+        throw new IllegalStateException(
+          "Active SparkSession is required to materialize Delta deletion 
vectors")
+      }
   }
 
   private def extractDeletionVectorInfo(
diff --git 
a/gluten-delta/src/main/scala/org/apache/gluten/execution/DeltaScanTransformer.scala
 
b/gluten-delta/src/main/scala/org/apache/gluten/execution/DeltaScanTransformer.scala
index 154b735554..6ac644622d 100644
--- 
a/gluten-delta/src/main/scala/org/apache/gluten/execution/DeltaScanTransformer.scala
+++ 
b/gluten-delta/src/main/scala/org/apache/gluten/execution/DeltaScanTransformer.scala
@@ -16,19 +16,24 @@
  */
 package org.apache.gluten.execution
 
+import org.apache.gluten.delta.DeltaDeletionVectorScanInfo
 import org.apache.gluten.sql.shims.SparkShimLoader
+import org.apache.gluten.substrait.rel.{DeltaLocalFilesBuilder, 
LocalFilesNode, SplitInfo}
 import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
 
+import org.apache.spark.Partition
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, Expression}
 import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.connector.read.streaming.SparkDataStream
 import org.apache.spark.sql.delta.{DeltaParquetFileFormat, NoMapping}
 import org.apache.spark.sql.execution.FileSourceScanExec
-import org.apache.spark.sql.execution.datasources.HadoopFsRelation
+import org.apache.spark.sql.execution.datasources.{FilePartition, 
HadoopFsRelation}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.collection.BitSet
 
+import scala.collection.JavaConverters._
+
 case class DeltaScanTransformer(
     @transient override val relation: HadoopFsRelation,
     @transient stream: Option[SparkDataStream],
@@ -84,16 +89,30 @@ case class DeltaScanTransformer(
     case _ => dataFilters
   }
 
-  override protected def doValidateInternal(): ValidationResult = {
-    if (
-      requiredSchema.fields.exists(
-        _.name == "__delta_internal_is_row_deleted") || 
requiredSchema.fields.exists(
-        _.name == "__delta_internal_row_index")
-    ) {
-      return ValidationResult.failed(s"Deletion vector is not supported in 
native.")
+  /**
+   * Decorates the generically built split infos with per-file deletion-vector 
read options so the
+   * native Delta scan can apply DV filtering. Delta-specific extraction 
happens here -- where Delta
+   * classes are directly linkable -- rather than in the backend iterator API, 
mirroring
+   * `IcebergScanTransformer`. Splits without any DV keep the generic 
representation.
+   */
+  override def getSplitInfosFromPartitions(
+      partitions: Seq[(Partition, ReadFileFormat)]): Seq[SplitInfo] = {
+    val splitInfos = super.getSplitInfosFromPartitions(partitions)
+    val partitionColumnCount = getPartitionSchema.fields.length
+    splitInfos.zip(partitions).map {
+      case (localFiles: LocalFilesNode, (filePartition: FilePartition, _)) =>
+        DeltaDeletionVectorScanInfo
+          .normalize(partitionColumnCount, filePartition.files.toSeq)
+          .map {
+            case (otherMetadataColumns, deltaReadOptions) =>
+              DeltaLocalFilesBuilder.makeDeltaLocalFiles(
+                localFiles,
+                otherMetadataColumns.asJava,
+                deltaReadOptions.asJava): SplitInfo
+          }
+          .getOrElse(localFiles)
+      case (splitInfo, _) => splitInfo
     }
-
-    super.doValidateInternal()
   }
 
   override def doCanonicalize(): DeltaScanTransformer = {
@@ -119,7 +138,6 @@ case class DeltaScanTransformer(
 }
 
 object DeltaScanTransformer {
-
   def apply(scanExec: FileSourceScanExec): DeltaScanTransformer = {
     new DeltaScanTransformer(
       scanExec.relation,
diff --git 
a/gluten-delta/src/main/scala/org/apache/gluten/extension/DeltaPostTransformRules.scala
 
b/gluten-delta/src/main/scala/org/apache/gluten/extension/DeltaPostTransformRules.scala
index 421db09732..d984faf75b 100644
--- 
a/gluten-delta/src/main/scala/org/apache/gluten/extension/DeltaPostTransformRules.scala
+++ 
b/gluten-delta/src/main/scala/org/apache/gluten/extension/DeltaPostTransformRules.scala
@@ -16,16 +16,17 @@
  */
 package org.apache.gluten.extension
 
-import org.apache.gluten.execution.{DeltaScanTransformer, 
ProjectExecTransformer}
+import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.execution.{DeltaScanTransformer, 
FilterExecTransformerBase, ProjectExecTransformer}
 import org.apache.gluten.extension.columnar.transition.RemoveTransitions
 
 import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
AttributeReference, CreateNamedStruct, Expression, GetStructField, If, 
InputFileBlockLength, InputFileBlockStart, InputFileName, IsNull, 
LambdaFunction, Literal, NamedLambdaVariable}
+import org.apache.spark.sql.catalyst.expressions.{Alias, And, Attribute, 
AttributeReference, CreateNamedStruct, Expression, GetStructField, If, 
InputFileBlockLength, InputFileBlockStart, InputFileName, IsNull, 
LambdaFunction, Literal, NamedExpression, NamedLambdaVariable}
 import org.apache.spark.sql.catalyst.expressions.{ArrayTransform, 
TransformKeys, TransformValues}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.trees.TreeNodeTag
 import org.apache.spark.sql.delta.{DeltaColumnMapping, DeltaParquetFileFormat, 
NoMapping}
-import org.apache.spark.sql.execution.{ProjectExec, SparkPlan}
+import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan}
 import org.apache.spark.sql.execution.datasources.FileFormat
 import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructType}
 
@@ -34,10 +35,20 @@ import scala.collection.mutable.ListBuffer
 
 object DeltaPostTransformRules {
   def rules: Seq[Rule[SparkPlan]] =
-    RemoveTransitions :: pushDownInputFileExprRule :: columnMappingRule :: Nil
+    RemoveTransitions ::
+      nativeDeletionVectorRule ::
+      pushDownInputFileExprRule ::
+      columnMappingRule :: Nil
+
+  private val deletionVectorDeletedRowColumnName = 
"__delta_internal_is_row_deleted"
+  private val deletionVectorRowIndexColumnName = "__delta_internal_row_index"
+  private val deletionVectorInternalColumnNames =
+    Set(deletionVectorDeletedRowColumnName, deletionVectorRowIndexColumnName)
 
   private val COLUMN_MAPPING_RULE_TAG: TreeNodeTag[String] =
     TreeNodeTag[String]("org.apache.gluten.delta.column.mapping")
+  private val PRESERVE_DELETION_VECTOR_ROW_INDEX_TAG: TreeNodeTag[Boolean] =
+    
TreeNodeTag[Boolean]("org.apache.gluten.delta.preserve.deletion.vector.row.index")
 
   private def notAppliedColumnMappingRule(plan: SparkPlan): Boolean = {
     plan.getTagValue(COLUMN_MAPPING_RULE_TAG).isEmpty
@@ -65,6 +76,101 @@ object DeltaPostTransformRules {
         child.copy(output = p.output)
     }
 
+  /**
+   * Spark Delta injects synthetic deletion-vector predicates and columns into 
the plan (via
+   * `PreprocessTableWithDVsStrategy`). Those drive the JVM reader path; for 
the native Delta scan
+   * path they must be stripped, since Velox applies the DV as a per-file mask 
from the split info
+   * and would otherwise double-apply it with incompatible semantics.
+   *
+   * {{{
+   * Input (scan offloaded, shape injected by Delta during planning):
+   *   ProjectExecTransformer [key, value]
+   *   +- FilterExecTransformer (__delta_internal_is_row_deleted = 0)
+   *      +- DeltaScanTransformer [key, value, __delta_internal_is_row_deleted]
+   *
+   * Output:
+   *   DeltaScanTransformer [key, value]
+   * }}}
+   *
+   * `__delta_internal_row_index` is preserved when an upstream operator still 
references it.
+   * Non-offloaded scans are untouched and keep vanilla JVM filtering.
+   */
+  val nativeDeletionVectorRule: Rule[SparkPlan] = (plan: SparkPlan) => {
+    tagRowIndexRequiredSubtrees(plan)
+    plan.transformUp {
+      case scan: DeltaScanTransformer =>
+        val cleanedDataFilters = 
scan.dataFilters.flatMap(stripDeletionVectorPredicate)
+        val cleanedPushDownFilters =
+          scan.pushDownFilters.map(_.flatMap(stripDeletionVectorPredicate))
+        val preserveRowIndex = shouldPreserveDeletionVectorRowIndex(scan)
+        val cleanedOutput = stripDeletionVectorInternalOutput(scan.output, 
preserveRowIndex)
+        val cleanedRequiredSchema =
+          stripDeletionVectorInternalSchema(scan.requiredSchema, 
preserveRowIndex)
+        if (
+          cleanedDataFilters == scan.dataFilters &&
+          cleanedPushDownFilters == scan.pushDownFilters &&
+          cleanedOutput == scan.output &&
+          cleanedRequiredSchema == scan.requiredSchema
+        ) {
+          scan
+        } else {
+          scan.copy(
+            output = cleanedOutput,
+            requiredSchema = cleanedRequiredSchema,
+            dataFilters = cleanedDataFilters,
+            pushDownFilters = cleanedPushDownFilters)
+        }
+      case project: ProjectExecTransformer if 
containsNativeDeltaScan(project.child) =>
+        val cleanedProjectList = stripDeletionVectorInternalProjectList(
+          project.projectList,
+          shouldPreserveDeletionVectorRowIndex(project))
+        if (cleanedProjectList == project.projectList) {
+          project
+        } else if (cleanedProjectList.isEmpty) {
+          project.child
+        } else {
+          ProjectExecTransformer(cleanedProjectList, project.child)
+        }
+      case project: ProjectExec if containsNativeDeltaScan(project.child) =>
+        val cleanedProjectList = stripDeletionVectorInternalProjectList(
+          project.projectList,
+          shouldPreserveDeletionVectorRowIndex(project))
+        if (cleanedProjectList == project.projectList) {
+          project
+        } else if (cleanedProjectList.isEmpty) {
+          project.child
+        } else {
+          ProjectExec(cleanedProjectList, project.child)
+        }
+      case filter: FilterExecTransformerBase if 
containsNativeDeltaScan(filter.child) =>
+        stripDeletionVectorPredicate(filter.cond) match {
+          case Some(cleanCondition) if cleanCondition != filter.cond =>
+            BackendsApiManager.getSparkPlanExecApiInstance
+              .genFilterExecTransformer(cleanCondition, filter.child)
+          case Some(_) =>
+            filter
+          case None =>
+            filter.child
+        }
+      case filter: FilterExec if containsNativeDeltaScan(filter.child) =>
+        stripDeletionVectorPredicate(filter.condition) match {
+          case Some(cleanCondition) if cleanCondition != filter.condition =>
+            FilterExec(cleanCondition, filter.child)
+          case Some(_) =>
+            filter
+          case None =>
+            filter.child
+        }
+    }
+  }
+
+  private def containsNativeDeltaScan(plan: SparkPlan): Boolean = {
+    plan.exists {
+      case _: DeltaScanTransformer => true
+      case _ => false
+    }
+  }
+
   private def isDeltaColumnMappingFileFormat(fileFormat: FileFormat): Boolean 
= fileFormat match {
     case d: DeltaParquetFileFormat if d.columnMappingMode != NoMapping =>
       true
@@ -79,6 +185,82 @@ object DeltaPostTransformRules {
     }
   }
 
+  private def referencesDeletionVectorInternalColumn(expr: Expression): 
Boolean = {
+    expr.references.exists(attr => 
deletionVectorInternalColumnNames.contains(attr.name))
+  }
+
+  private def referencesDeletionVectorRowIndex(expr: Expression): Boolean = {
+    expr.references.exists(_.name == deletionVectorRowIndexColumnName)
+  }
+
+  private def tagRowIndexRequiredSubtrees(plan: SparkPlan): Unit = {
+    def tagSubtree(subtree: SparkPlan): Unit = {
+      subtree.foreach(_.setTagValue(PRESERVE_DELETION_VECTOR_ROW_INDEX_TAG, 
true))
+    }
+
+    def visit(node: SparkPlan): Unit = {
+      val shouldPreserveRowIndex =
+        node.expressions.exists(containsIncrementMetricExpr) ||
+          node.expressions.exists(referencesDeletionVectorRowIndex)
+      if (shouldPreserveRowIndex) {
+        node.children.foreach(tagSubtree)
+      }
+      node.children.foreach(visit)
+    }
+
+    visit(plan)
+  }
+
+  private def shouldPreserveDeletionVectorRowIndex(plan: SparkPlan): Boolean = 
{
+    plan.getTagValue(PRESERVE_DELETION_VECTOR_ROW_INDEX_TAG).contains(true) ||
+    plan.expressions.exists(containsIncrementMetricExpr) ||
+    plan.expressions.exists(referencesDeletionVectorRowIndex)
+  }
+
+  private def shouldStripDeletionVectorInternalColumn(
+      columnName: String,
+      preserveRowIndex: Boolean): Boolean = {
+    columnName == deletionVectorDeletedRowColumnName ||
+    (!preserveRowIndex && columnName == deletionVectorRowIndexColumnName)
+  }
+
+  private def stripDeletionVectorInternalOutput(
+      output: Seq[Attribute],
+      preserveRowIndex: Boolean): Seq[Attribute] = {
+    output.filterNot(attr => 
shouldStripDeletionVectorInternalColumn(attr.name, preserveRowIndex))
+  }
+
+  private def stripDeletionVectorInternalProjectList(
+      projectList: Seq[NamedExpression],
+      preserveRowIndex: Boolean): Seq[NamedExpression] = {
+    projectList.filterNot(
+      expr => shouldStripDeletionVectorInternalColumn(expr.name, 
preserveRowIndex))
+  }
+
+  private def stripDeletionVectorInternalSchema(
+      schema: StructType,
+      preserveRowIndex: Boolean): StructType = {
+    StructType(
+      schema.filterNot(
+        field => shouldStripDeletionVectorInternalColumn(field.name, 
preserveRowIndex)))
+  }
+
+  private def stripDeletionVectorPredicate(expr: Expression): 
Option[Expression] = {
+    expr match {
+      case And(left, right) =>
+        (stripDeletionVectorPredicate(left), 
stripDeletionVectorPredicate(right)) match {
+          case (Some(cleanLeft), Some(cleanRight)) => Some(And(cleanLeft, 
cleanRight))
+          case (Some(cleanLeft), None) => Some(cleanLeft)
+          case (None, Some(cleanRight)) => Some(cleanRight)
+          case (None, None) => None
+        }
+      case other if referencesDeletionVectorInternalColumn(other) =>
+        None
+      case other =>
+        Some(other)
+    }
+  }
+
   private def isInputFileRelatedAttribute(attr: Attribute): Boolean = {
     attr match {
       case AttributeReference(name, _, _, _) =>
diff --git 
a/gluten-delta/src/main/scala/org/apache/gluten/extension/OffloadDeltaScan.scala
 
b/gluten-delta/src/main/scala/org/apache/gluten/extension/OffloadDeltaScan.scala
index 5fe1b4ba86..ebafb0c08c 100644
--- 
a/gluten-delta/src/main/scala/org/apache/gluten/extension/OffloadDeltaScan.scala
+++ 
b/gluten-delta/src/main/scala/org/apache/gluten/extension/OffloadDeltaScan.scala
@@ -17,16 +17,93 @@
 package org.apache.gluten.extension
 
 import org.apache.gluten.execution.DeltaScanTransformer
+import org.apache.gluten.extension.columnar.FallbackTags
 import org.apache.gluten.extension.columnar.offload.OffloadSingleNode
 
 import org.apache.spark.sql.delta.DeltaParquetFileFormat
+import org.apache.spark.sql.delta.SnapshotDescriptor
+import 
org.apache.spark.sql.delta.commands.DeletionVectorUtils.deletionVectorsReadable
+import org.apache.spark.sql.delta.files.TahoeFileIndex
+import org.apache.spark.sql.delta.stats.PreparedDeltaFileIndex
 import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan}
+import org.apache.spark.util.SparkVersionUtil
 
 case class OffloadDeltaScan() extends OffloadSingleNode {
+  private val DeletionVectorsUseMetadataRowIndexKey =
+    "spark.databricks.delta.deletionVectors.useMetadataRowIndex"
+
   override def offload(plan: SparkPlan): SparkPlan = plan match {
+    case scan: FileSourceScanExec if isDeltaLogScan(scan) =>
+      FallbackTags.add(scan, "fallback Delta _delta_log scan")
+      scan
+    case scan: FileSourceScanExec if 
shouldFallbackSpark34DeletionVectorScan(scan) =>
+      FallbackTags.add(scan, "fallback Spark 3.4 Delta DV scan")
+      scan
     case scan: FileSourceScanExec
-        if scan.relation.fileFormat.getClass == 
classOf[DeltaParquetFileFormat] =>
+        if shouldFallbackDeletionVectorScanWithoutMetadataRowIndex(scan) =>
+      FallbackTags.add(scan, "fallback Delta DV scan without metadata row 
index")
+      scan
+    case scan: FileSourceScanExec if isDeltaScan(scan) =>
       DeltaScanTransformer(scan)
     case other => other
   }
+
+  private def isDeltaScan(scan: FileSourceScanExec): Boolean = {
+    isDeltaFileIndex(scan) || isDeltaParquetScan(scan)
+  }
+
+  private def isDeltaParquetScan(scan: FileSourceScanExec): Boolean = {
+    val fileFormatClass = scan.relation.fileFormat.getClass
+    fileFormatClass == classOf[DeltaParquetFileFormat] ||
+    fileFormatClass.getSimpleName == "GlutenDeltaParquetFileFormat"
+  }
+
+  private def isDeltaFileIndex(scan: FileSourceScanExec): Boolean = {
+    scan.relation.location.isInstanceOf[TahoeFileIndex] ||
+    scan.relation.location.isInstanceOf[PreparedDeltaFileIndex]
+  }
+
+  private def isDeltaLogScan(scan: FileSourceScanExec): Boolean = {
+    scan.relation.location.rootPaths.exists {
+      path =>
+        val root = path.toString
+        root.contains("/_delta_log") || root.contains("\\_delta_log") || 
root.endsWith("_delta_log")
+    }
+  }
+
+  private def shouldFallbackSpark34DeletionVectorScan(scan: 
FileSourceScanExec): Boolean = {
+    if (SparkVersionUtil.gteSpark35) {
+      return false
+    }
+
+    containsDeletionVector(scan)
+  }
+
+  private def shouldFallbackDeletionVectorScanWithoutMetadataRowIndex(
+      scan: FileSourceScanExec): Boolean = {
+    if (!SparkVersionUtil.gteSpark35) {
+      return false
+    }
+
+    // Delta DML tests force this path and rely on Spark's injected
+    // row-index filter column for correctness. Keep it on Spark until the 
native path can
+    // prove the same contract for DML-generated DVs.
+    val useMetadataRowIndex =
+      scan.relation.sparkSession.sessionState.conf
+        .getConfString(DeletionVectorsUseMetadataRowIndexKey, "true")
+        .toBoolean
+    !useMetadataRowIndex && containsDeletionVector(scan)
+  }
+
+  private def containsDeletionVector(scan: FileSourceScanExec): Boolean = {
+    scan.relation.location match {
+      case preparedIndex: PreparedDeltaFileIndex =>
+        preparedIndex.preparedScan.files.exists(_.deletionVector != null)
+      case index: TahoeFileIndex =>
+        val snapshot = index.asInstanceOf[SnapshotDescriptor]
+        deletionVectorsReadable(snapshot.protocol, snapshot.metadata)
+      case _ =>
+        false
+    }
+  }
 }
diff --git 
a/gluten-delta/src/test/scala/org/apache/gluten/execution/DeltaSuite.scala 
b/gluten-delta/src/test/scala/org/apache/gluten/execution/DeltaSuite.scala
index 5f88e18203..c138bed1cd 100644
--- a/gluten-delta/src/test/scala/org/apache/gluten/execution/DeltaSuite.scala
+++ b/gluten-delta/src/test/scala/org/apache/gluten/execution/DeltaSuite.scala
@@ -19,6 +19,7 @@ package org.apache.gluten.execution
 import org.apache.spark.SparkConf
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.types._
+import org.apache.spark.util.SparkVersionUtil
 
 import scala.collection.JavaConverters._
 
@@ -37,6 +38,7 @@ abstract class DeltaSuite extends WholeStageTransformerSuite {
       .set("spark.memory.offHeap.size", "2g")
       .set("spark.unsafe.exceptionOnMemoryLeak", "true")
       .set("spark.sql.autoBroadcastJoinThreshold", "-1")
+      .set("spark.sql.ansi.enabled", "false")
       .set("spark.sql.sources.useV1SourceList", "avro")
       .set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
       .set("spark.sql.catalog.spark_catalog", 
"org.apache.spark.sql.delta.catalog.DeltaCatalog")
@@ -422,12 +424,16 @@ abstract class DeltaSuite extends 
WholeStageTransformerSuite {
           s"ALTER TABLE delta.`$path` SET TBLPROPERTIES 
('delta.enableDeletionVectors' = true)")
         checkAnswer(spark.read.format("delta").load(path), df1.union(df2))
         spark.sql(s"DELETE FROM delta.`$path` WHERE id IN 
(${values2.mkString(", ")})")
-        import org.apache.spark.sql.execution.GlutenImplicits._
         val df = spark.read.format("delta").load(path)
-        assert(
-          df.fallbackSummary.fallbackNodeToReason
-            .flatMap(_.values)
-            .exists(_.contains("Deletion vector is not supported in native")))
+        val executedPlan = df.queryExecution.executedPlan
+        if (SparkVersionUtil.gteSpark35) {
+          assert(executedPlan.collect { case _: DeltaScanTransformer => true 
}.nonEmpty)
+          val planText = executedPlan.toString()
+          assert(!planText.contains("__delta_internal_is_row_deleted"))
+          assert(!planText.contains("__delta_internal_row_index"))
+        } else {
+          assert(executedPlan.collect { case _: DeltaScanTransformer => true 
}.isEmpty)
+        }
         checkAnswer(df, df1)
     }
   }
@@ -533,13 +539,13 @@ abstract class DeltaSuite extends 
WholeStageTransformerSuite {
     withSQLConf("spark.gluten.sql.columnar.scanOnly" -> "true") {
       withTable("delta_pf") {
         spark.sql(s"""
-                     |create table test (id int, name string) using delta
+                     |create table delta_pf (id int, name string) using delta
                      |""".stripMargin)
         spark.sql(s"""
-                     |insert into test values (1, "v1"), (2, "v2"), (3, "v1"), 
(4, "v2")
+                     |insert into delta_pf values (1, "v1"), (2, "v2"), (3, 
"v1"), (4, "v2")
                      |""".stripMargin)
         runQueryAndCompare(
-          "select id from test where name > 'v1'",
+          "select id from delta_pf where name > 'v1'",
           compareResult = true,
           noFallBack = false) {
           df =>
diff --git 
a/gluten-delta/src/main/scala/org/apache/gluten/extension/OffloadDeltaScan.scala
 
b/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/DeltaLocalFilesBuilder.java
similarity index 54%
copy from 
gluten-delta/src/main/scala/org/apache/gluten/extension/OffloadDeltaScan.scala
copy to 
gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/DeltaLocalFilesBuilder.java
index 5fe1b4ba86..c448dca508 100644
--- 
a/gluten-delta/src/main/scala/org/apache/gluten/extension/OffloadDeltaScan.scala
+++ 
b/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/DeltaLocalFilesBuilder.java
@@ -14,19 +14,24 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.gluten.extension
+package org.apache.gluten.substrait.rel;
 
-import org.apache.gluten.execution.DeltaScanTransformer
-import org.apache.gluten.extension.columnar.offload.OffloadSingleNode
+import 
org.apache.gluten.substrait.rel.DeltaLocalFilesNode.DeltaFileReadOptions;
 
-import org.apache.spark.sql.delta.DeltaParquetFileFormat
-import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan}
+import java.util.List;
+import java.util.Map;
 
-case class OffloadDeltaScan() extends OffloadSingleNode {
-  override def offload(plan: SparkPlan): SparkPlan = plan match {
-    case scan: FileSourceScanExec
-        if scan.relation.fileFormat.getClass == 
classOf[DeltaParquetFileFormat] =>
-      DeltaScanTransformer(scan)
-    case other => other
+public class DeltaLocalFilesBuilder {
+  private DeltaLocalFilesBuilder() {}
+
+  /**
+   * Decorates a generically built {@link LocalFilesNode} with per-file Delta 
read options,
+   * replacing its extra metadata with the Delta-normalized variant.
+   */
+  public static DeltaLocalFilesNode makeDeltaLocalFiles(
+      LocalFilesNode base,
+      List<Map<String, Object>> otherMetadataColumns,
+      List<DeltaFileReadOptions> deltaReadOptions) {
+    return new DeltaLocalFilesNode(base, otherMetadataColumns, 
deltaReadOptions);
   }
 }
diff --git 
a/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/DeltaLocalFilesNode.java
 
b/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/DeltaLocalFilesNode.java
new file mode 100644
index 0000000000..f79486947a
--- /dev/null
+++ 
b/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/DeltaLocalFilesNode.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.substrait.rel;
+
+import com.google.protobuf.ByteString;
+import io.substrait.proto.ReadRel;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class DeltaLocalFilesNode extends LocalFilesNode {
+  private final List<DeltaFileReadOptions> deltaReadOptions = new 
ArrayList<>();
+
+  DeltaLocalFilesNode(
+      LocalFilesNode base,
+      List<Map<String, Object>> otherMetadataColumns,
+      List<DeltaFileReadOptions> deltaReadOptions) {
+    super(base, otherMetadataColumns);
+    if (deltaReadOptions == null || deltaReadOptions.size() != 
getPaths().size()) {
+      throw new IllegalArgumentException(
+          String.format(
+              "deltaReadOptions must contain one entry per file path, expected 
%d but got %s",
+              getPaths().size(),
+              deltaReadOptions == null ? "null" : 
String.valueOf(deltaReadOptions.size())));
+    }
+    this.deltaReadOptions.addAll(deltaReadOptions);
+  }
+
+  @Override
+  protected void processFileBuilder(ReadRel.LocalFiles.FileOrFiles.Builder 
fileBuilder, int index) {
+    DeltaFileReadOptions options = deltaReadOptions.get(index);
+    ReadRel.LocalFiles.FileOrFiles.DeltaReadOptions.Builder deltaBuilder =
+        ReadRel.LocalFiles.FileOrFiles.DeltaReadOptions.newBuilder()
+            
.setRowIndexFilterType(toProtoRowIndexFilterType(options.rowIndexFilterType()))
+            .setHasDeletionVector(options.hasDeletionVector());
+
+    if (options.hasDeletionVector()) {
+      deltaBuilder
+          .setDeletionVectorCardinality(options.deletionVectorCardinality())
+          
.setSerializedDeletionVector(ByteString.copyFrom(options.serializedDeletionVector()));
+    }
+
+    fileBuilder.setDelta(deltaBuilder.build());
+  }
+
+  private static 
ReadRel.LocalFiles.FileOrFiles.DeltaReadOptions.RowIndexFilterType
+      toProtoRowIndexFilterType(RowIndexFilterType rowIndexFilterType) {
+    switch (rowIndexFilterType) {
+      case IF_CONTAINED:
+        return 
ReadRel.LocalFiles.FileOrFiles.DeltaReadOptions.RowIndexFilterType.IF_CONTAINED;
+      case IF_NOT_CONTAINED:
+        return 
ReadRel.LocalFiles.FileOrFiles.DeltaReadOptions.RowIndexFilterType.IF_NOT_CONTAINED;
+      case KEEP_ALL:
+      default:
+        return 
ReadRel.LocalFiles.FileOrFiles.DeltaReadOptions.RowIndexFilterType.KEEP_ALL;
+    }
+  }
+
+  public enum RowIndexFilterType {
+    KEEP_ALL,
+    IF_CONTAINED,
+    IF_NOT_CONTAINED
+  }
+
+  public static class DeltaFileReadOptions implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final RowIndexFilterType rowIndexFilterType;
+    private final boolean hasDeletionVector;
+    private final long deletionVectorCardinality;
+    private final byte[] serializedDeletionVector;
+
+    public DeltaFileReadOptions(
+        RowIndexFilterType rowIndexFilterType,
+        boolean hasDeletionVector,
+        long deletionVectorCardinality,
+        byte[] serializedDeletionVector) {
+      this.rowIndexFilterType = rowIndexFilterType;
+      this.hasDeletionVector = hasDeletionVector;
+      this.deletionVectorCardinality = deletionVectorCardinality;
+      this.serializedDeletionVector =
+          serializedDeletionVector == null ? new byte[0] : 
serializedDeletionVector;
+    }
+
+    public RowIndexFilterType rowIndexFilterType() {
+      return rowIndexFilterType;
+    }
+
+    public boolean hasDeletionVector() {
+      return hasDeletionVector;
+    }
+
+    public long deletionVectorCardinality() {
+      return deletionVectorCardinality;
+    }
+
+    public byte[] serializedDeletionVector() {
+      return serializedDeletionVector;
+    }
+  }
+}
diff --git 
a/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java
 
b/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java
index bc42dd302e..8a79351ad2 100644
--- 
a/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java
+++ 
b/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java
@@ -93,6 +93,27 @@ public class LocalFilesNode implements SplitInfo {
     this.iterAsInput = true;
   }
 
+  /**
+   * Copies an existing node, replacing its per-file extra metadata. Lets 
data-lake subclasses
+   * decorate a generically built node without re-deriving the file listing.
+   */
+  protected LocalFilesNode(LocalFilesNode other, List<Map<String, Object>> 
otherMetadataColumns) {
+    this.index = other.index;
+    this.paths.addAll(other.paths);
+    this.starts.addAll(other.starts);
+    this.lengths.addAll(other.lengths);
+    this.fileSizes.addAll(other.fileSizes);
+    this.modificationTimes.addAll(other.modificationTimes);
+    this.partitionColumns.addAll(other.partitionColumns);
+    this.metadataColumns.addAll(other.metadataColumns);
+    this.fileFormat = other.fileFormat;
+    this.preferredLocations.addAll(other.preferredLocations);
+    this.fileReadProperties = other.fileReadProperties;
+    this.iterAsInput = other.iterAsInput;
+    this.fileSchema = other.fileSchema;
+    this.otherMetadataColumns.addAll(otherMetadataColumns);
+  }
+
   public List<String> getPaths() {
     return paths;
   }
diff --git 
a/gluten-substrait/src/main/resources/substrait/proto/substrait/algebra.proto 
b/gluten-substrait/src/main/resources/substrait/proto/substrait/algebra.proto
index 2bfb68e097..02c7f4cc5c 100644
--- 
a/gluten-substrait/src/main/resources/substrait/proto/substrait/algebra.proto
+++ 
b/gluten-substrait/src/main/resources/substrait/proto/substrait/algebra.proto
@@ -197,6 +197,18 @@ message ReadRel {
         repeated DeleteFile delete_files = 3;
       }
 
+      message DeltaReadOptions {
+        enum RowIndexFilterType {
+          KEEP_ALL = 0;
+          IF_CONTAINED = 1;
+          IF_NOT_CONTAINED = 2;
+        }
+        RowIndexFilterType row_index_filter_type = 1;
+        bool has_deletion_vector = 2;
+        uint64 deletion_vector_cardinality = 3;
+        bytes serialized_deletion_vector = 4;
+      }
+
       // File reading options
       oneof file_format {
         ParquetReadOptions parquet = 9;
@@ -207,6 +219,7 @@ message ReadRel {
         TextReadOptions text = 14;
         JsonReadOptions json = 15;
         IcebergReadOptions iceberg = 16;
+        DeltaReadOptions delta = 22;
       }
 
       message partitionColumn {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to