This is an automated email from the ASF dual-hosted git repository.
zhztheplayer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gluten.git
The following commit(s) were added to refs/heads/main by this push:
new f75b869131 [VL][Delta] Add JVM Delta DV scan handoff (#12269)
f75b869131 is described below
commit f75b869131e957701145b19bc859c47272b8de74
Author: Mohammad Linjawi <[email protected]>
AuthorDate: Fri Jun 12 16:47:51 2026 +0300
[VL][Delta] Add JVM Delta DV scan handoff (#12269)
---
.../gluten/component/VeloxDeltaComponent.scala | 4 +
.../delta/DeltaDeletionVectorHandoffSuite.scala | 68 ++++++++
.../org/apache/spark/sql/delta/DeltaSuite.scala | 28 +++
.../delta/DeltaDeletionVectorHandoffSuite.scala | 98 +++++++++++
cpp/velox/compute/VeloxPlanConverter.cc | 94 ++++++++++
cpp/velox/compute/WholeStageResultIterator.cc | 69 +++++++-
cpp/velox/compute/delta/DeltaSplitInfo.h | 46 +++++
cpp/velox/substrait/SubstraitToVeloxPlan.cc | 13 +-
.../delta/DeltaDeletionVectorScanInfo.scala} | 21 +--
.../delta/DeltaDeletionVectorScanInfo.scala} | 21 +--
.../delta/DeltaDeletionVectorScanInfo.scala} | 21 +--
.../gluten/delta/DeltaDeletionVectorScanInfo.scala | 57 ++++++-
.../gluten/delta/DeltaDeletionVectorScanInfo.scala | 57 ++++++-
.../gluten/execution/DeltaScanTransformer.scala | 40 +++--
.../gluten/extension/DeltaPostTransformRules.scala | 190 ++++++++++++++++++++-
.../apache/gluten/extension/OffloadDeltaScan.scala | 79 ++++++++-
.../org/apache/gluten/execution/DeltaSuite.scala | 22 ++-
.../substrait/rel/DeltaLocalFilesBuilder.java | 27 +--
.../gluten/substrait/rel/DeltaLocalFilesNode.java | 117 +++++++++++++
.../gluten/substrait/rel/LocalFilesNode.java | 21 +++
.../substrait/proto/substrait/algebra.proto | 13 ++
21 files changed, 1013 insertions(+), 93 deletions(-)
diff --git
a/backends-velox/src-delta/main/scala/org/apache/gluten/component/VeloxDeltaComponent.scala
b/backends-velox/src-delta/main/scala/org/apache/gluten/component/VeloxDeltaComponent.scala
index 0587e8b07f..164cf52886 100644
---
a/backends-velox/src-delta/main/scala/org/apache/gluten/component/VeloxDeltaComponent.scala
+++
b/backends-velox/src-delta/main/scala/org/apache/gluten/component/VeloxDeltaComponent.scala
@@ -36,6 +36,10 @@ class VeloxDeltaComponent extends Component {
override def injectRules(injector: Injector): Unit = {
val legacy = injector.gluten.legacy
+ // Deletion-vector scans need no Gluten-side logical preprocessing:
Delta's own
+ // PreprocessTableWithDVsStrategy injects the skip-row column and filter
during physical
+ // planning, DeltaPostTransformRules.nativeDeletionVectorRule strips them
when the scan
+ // offloads, and DeltaScanTransformer materializes the per-file DV
payloads for Velox.
legacy.injectTransform {
c =>
val offload = Seq(OffloadDeltaScan(), OffloadDeltaProject(),
OffloadDeltaFilter())
diff --git
a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaDeletionVectorHandoffSuite.scala
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaDeletionVectorHandoffSuite.scala
new file mode 100644
index 0000000000..f5510a9525
--- /dev/null
+++
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaDeletionVectorHandoffSuite.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.delta
+
+import org.apache.gluten.execution.DeltaScanTransformer
+
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.delta.test.{DeltaSQLCommandTest, DeltaSQLTestUtils}
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.tags.ExtendedSQLTest
+
+import org.apache.hadoop.fs.Path
+
+@ExtendedSQLTest
+class DeltaDeletionVectorHandoffSuite
+ extends QueryTest
+ with SharedSparkSession
+ with DeltaSQLTestUtils
+ with DeltaSQLCommandTest {
+
+ import testImplicits._
+
+ test("Spark 3.5 Delta DV scan handoff should filter deleted rows") {
+ withTempDir {
+ tempDir =>
+ val path = tempDir.getCanonicalPath
+ Seq((1, "a"), (2, "b"), (3, "c"), (4, "d"))
+ .toDF("id", "value")
+ .coalesce(1)
+ .write
+ .format("delta")
+ .save(path)
+
+ spark.sql(
+ s"ALTER TABLE delta.`$path` SET TBLPROPERTIES
('delta.enableDeletionVectors' = true)")
+ spark.sql(s"DELETE FROM delta.`$path` WHERE id IN (3, 4)")
+
+ val log = DeltaLog.forTable(spark, new Path(path))
+ val addFileWithDv =
log.update().allFiles.collect().find(_.deletionVector != null)
+ assert(addFileWithDv.nonEmpty)
+
+ val dataFile = addFileWithDv.get
+ assert(dataFile.deletionVector.cardinality == 2L)
+
+ val df = spark.read.format("delta").load(path)
+ val executedPlan = df.queryExecution.executedPlan
+ assert(executedPlan.collect { case _: DeltaScanTransformer => true
}.nonEmpty)
+ val planText = executedPlan.toString()
+ assert(!planText.contains("__delta_internal_is_row_deleted"))
+ assert(!planText.contains("__delta_internal_row_index"))
+ checkAnswer(df, Seq((1, "a"), (2, "b")).toDF())
+ }
+ }
+}
diff --git
a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaSuite.scala
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaSuite.scala
index f265168ddb..a60054031c 100644
---
a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaSuite.scala
+++
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaSuite.scala
@@ -197,6 +197,34 @@ class DeltaSuite
checkAnswer(data.toDF(), Row(1) :: Row(2) :: Row(3) :: Row(4) :: Row(5) ::
Row(6) :: Nil)
}
+ test("DV scan without metadata row index falls back and stays correct") {
+ withTempDir {
+ tempDir =>
+ val path = tempDir.getCanonicalPath
+ Seq((1, "a"), (2, "b"), (3, "c"), (4, "d"))
+ .toDF("id", "value")
+ .coalesce(1)
+ .write
+ .format("delta")
+ .save(path)
+
+ spark.sql(
+ s"ALTER TABLE delta.`$path` SET TBLPROPERTIES
('delta.enableDeletionVectors' = true)")
+
+ withSQLConf(DeltaSQLConf.DELETION_VECTORS_USE_METADATA_ROW_INDEX.key
-> "false") {
+ spark.sql(s"DELETE FROM delta.`$path` WHERE id IN (3, 4)")
+
+ val log = DeltaLog.forTable(spark, new Path(path))
+ assert(log.update().allFiles.collect().exists(_.deletionVector !=
null))
+
+ val df = spark.read.format("delta").load(path)
+ val executedPlan = df.queryExecution.executedPlan
+ assert(executedPlan.collect { case _: DeltaScanTransformer => true
}.isEmpty)
+ checkAnswer(df, Seq(Row(1, "a"), Row(2, "b")))
+ }
+ }
+ }
+
test("partitioned append - nulls") {
val tempDir = Utils.createTempDir()
Seq(Some(1), None)
diff --git
a/backends-velox/src-delta40/test/scala/org/apache/spark/sql/delta/DeltaDeletionVectorHandoffSuite.scala
b/backends-velox/src-delta40/test/scala/org/apache/spark/sql/delta/DeltaDeletionVectorHandoffSuite.scala
new file mode 100644
index 0000000000..dda547b015
--- /dev/null
+++
b/backends-velox/src-delta40/test/scala/org/apache/spark/sql/delta/DeltaDeletionVectorHandoffSuite.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.delta
+
+import org.apache.gluten.execution.DeltaScanTransformer
+
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.delta.sources.DeltaSQLConf
+import org.apache.spark.sql.delta.test.{DeltaSQLCommandTest, DeltaSQLTestUtils}
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.tags.ExtendedSQLTest
+
+import org.apache.hadoop.fs.Path
+
+@ExtendedSQLTest
+class DeltaDeletionVectorHandoffSuite
+ extends QueryTest
+ with SharedSparkSession
+ with DeltaSQLTestUtils
+ with DeltaSQLCommandTest {
+
+ import testImplicits._
+
+ test("Spark 4 Delta DV scan should fall back when metadata row index is
disabled") {
+ withTempDir {
+ tempDir =>
+ val path = tempDir.getCanonicalPath
+ Seq((1, "a"), (2, "b"), (3, "c"), (4, "d"))
+ .toDF("id", "value")
+ .coalesce(1)
+ .write
+ .format("delta")
+ .save(path)
+
+ spark.sql(
+ s"ALTER TABLE delta.`$path` SET TBLPROPERTIES
('delta.enableDeletionVectors' = true)")
+ spark.sql(s"DELETE FROM delta.`$path` WHERE id IN (3, 4)")
+
+ val log = DeltaLog.forTable(spark, new Path(path))
+ assert(log.update().allFiles.collect().exists(_.deletionVector !=
null))
+
+ // This covers scan behavior over an existing DV. Keep the
no-metadata-row-index
+ // path on Spark until the native path can prove the same contract for
DML DVs.
+ withSQLConf(DeltaSQLConf.DELETION_VECTORS_USE_METADATA_ROW_INDEX.key
-> "false") {
+ val df = spark.read.format("delta").load(path)
+ val executedPlan = df.queryExecution.executedPlan
+ assert(executedPlan.collect { case _: DeltaScanTransformer => true
}.isEmpty)
+ checkAnswer(df, Seq((1, "a"), (2, "b")).toDF())
+ }
+ }
+ }
+
+ test("Spark 4 Delta DV scan handoff should filter deleted rows") {
+ withTempDir {
+ tempDir =>
+ val path = tempDir.getCanonicalPath
+ Seq((1, "a"), (2, "b"), (3, "c"), (4, "d"))
+ .toDF("id", "value")
+ .coalesce(1)
+ .write
+ .format("delta")
+ .save(path)
+
+ spark.sql(
+ s"ALTER TABLE delta.`$path` SET TBLPROPERTIES
('delta.enableDeletionVectors' = true)")
+ spark.sql(s"DELETE FROM delta.`$path` WHERE id IN (3, 4)")
+
+ val log = DeltaLog.forTable(spark, new Path(path))
+ val addFileWithDv =
log.update().allFiles.collect().find(_.deletionVector != null)
+ assert(addFileWithDv.nonEmpty)
+
+ val dataFile = addFileWithDv.get
+ assert(dataFile.deletionVector.cardinality == 2L)
+
+ val df = spark.read.format("delta").load(path)
+ val executedPlan = df.queryExecution.executedPlan
+ assert(executedPlan.collect { case _: DeltaScanTransformer => true
}.nonEmpty)
+ val planText = executedPlan.toString()
+ assert(!planText.contains("__delta_internal_is_row_deleted"))
+ assert(!planText.contains("__delta_internal_row_index"))
+ checkAnswer(df, Seq((1, "a"), (2, "b")).toDF())
+ }
+ }
+}
diff --git a/cpp/velox/compute/VeloxPlanConverter.cc
b/cpp/velox/compute/VeloxPlanConverter.cc
index f3ffab59a6..f47ca61050 100644
--- a/cpp/velox/compute/VeloxPlanConverter.cc
+++ b/cpp/velox/compute/VeloxPlanConverter.cc
@@ -17,8 +17,13 @@
#include "VeloxPlanConverter.h"
#include <filesystem>
+#include <limits>
+#include <optional>
+#include <google/protobuf/any.pb.h>
+#include <google/protobuf/wrappers.pb.h>
#include "config/GlutenConfig.h"
+#include "delta/DeltaSplitInfo.h"
#include "iceberg/IcebergPlanConverter.h"
#include "operators/plannodes/IteratorSplit.h"
@@ -48,6 +53,87 @@ VeloxPlanConverter::VeloxPlanConverter(
}
namespace {
+std::optional<std::string> unpackMetadataValue(const google::protobuf::Any&
value) {
+ google::protobuf::BytesValue bytesValue;
+ if (value.UnpackTo(&bytesValue)) {
+ return bytesValue.value();
+ }
+
+ google::protobuf::StringValue stringValue;
+ if (value.UnpackTo(&stringValue)) {
+ return stringValue.value();
+ }
+
+ google::protobuf::Int32Value int32Value;
+ if (value.UnpackTo(&int32Value)) {
+ return std::to_string(int32Value.value());
+ }
+
+ google::protobuf::Int64Value int64Value;
+ if (value.UnpackTo(&int64Value)) {
+ return std::to_string(int64Value.value());
+ }
+
+ google::protobuf::DoubleValue doubleValue;
+ if (value.UnpackTo(&doubleValue)) {
+ return std::to_string(doubleValue.value());
+ }
+
+ // Matches the string encoding the JVM side uses for booleans, which are
packed through
+ // SubstraitUtil.convertJavaObjectToAny's toString fallback rather than as
BoolValue.
+ google::protobuf::BoolValue boolValue;
+ if (value.UnpackTo(&boolValue)) {
+ return boolValue.value() ? "true" : "false";
+ }
+
+ return std::nullopt;
+}
+
+delta::DeltaRowIndexFilterType parseDeltaRowIndexFilterType(int filterType) {
+ switch (filterType) {
+ case 1:
+ return delta::DeltaRowIndexFilterType::kIfContained;
+ case 2:
+ return delta::DeltaRowIndexFilterType::kIfNotContained;
+ case 0:
+ default:
+ return delta::DeltaRowIndexFilterType::kKeepAll;
+ }
+}
+
+std::shared_ptr<DeltaSplitInfo> parseDeltaSplitInfo(
+ const substrait::ReadRel_LocalFiles_FileOrFiles& file,
+ std::shared_ptr<SplitInfo> splitInfo) {
+ auto deltaSplitInfo = std::dynamic_pointer_cast<DeltaSplitInfo>(splitInfo)
+ ? std::dynamic_pointer_cast<DeltaSplitInfo>(splitInfo)
+ : std::make_shared<DeltaSplitInfo>(*splitInfo);
+
+ deltaSplitInfo->format = dwio::common::FileFormat::PARQUET;
+ const auto& deltaReadOptions = file.delta();
+ deltaSplitInfo->rowIndexFilterTypes.emplace_back(
+ parseDeltaRowIndexFilterType(deltaReadOptions.row_index_filter_type()));
+
+ if (!deltaReadOptions.has_deletion_vector()) {
+ deltaSplitInfo->deletionVectors.emplace_back(std::nullopt);
+ return deltaSplitInfo;
+ }
+
+ auto serializedPayload = deltaReadOptions.serialized_deletion_vector();
+ VELOX_USER_CHECK(!serializedPayload.empty(), "Delta split has a deletion
vector without a serialized payload");
+ VELOX_USER_CHECK_LE(
+ serializedPayload.size(),
+ static_cast<size_t>(std::numeric_limits<int32_t>::max()),
+ "Delta deletion vector serialized payload is too large");
+ const auto cardinality =
static_cast<uint64_t>(deltaReadOptions.deletion_vector_cardinality());
+ auto payload = std::make_shared<std::string>(std::move(serializedPayload));
+ const SplitPayloadBufferView payloadView{
+ reinterpret_cast<const uint8_t*>(payload->data()),
static_cast<int32_t>(payload->size())};
+ deltaSplitInfo->deletionVectors.emplace_back(
+ delta::DeltaDeletionVectorDescriptor::serialized(cardinality,
payloadView));
+ deltaSplitInfo->deletionVectorPayloads.emplace_back(std::move(payload));
+ return deltaSplitInfo;
+}
+
std::shared_ptr<SplitInfo> parseScanSplitInfo(
const facebook::velox::config::ConfigBase* veloxCfg,
const
google::protobuf::RepeatedPtrField<substrait::ReadRel_LocalFiles_FileOrFiles>&
fileList) {
@@ -75,6 +161,11 @@ std::shared_ptr<SplitInfo> parseScanSplitInfo(
for (const auto& metadataColumn : file.metadata_columns()) {
metadataColumnMap[metadataColumn.key()] = metadataColumn.value();
}
+ for (const auto& otherMetadataColumn :
file.other_const_metadata_columns()) {
+ if (auto unpackedValue =
unpackMetadataValue(otherMetadataColumn.value())) {
+ metadataColumnMap[otherMetadataColumn.key()] =
std::move(*unpackedValue);
+ }
+ }
splitInfo->metadataColumns.emplace_back(metadataColumnMap);
splitInfo->paths.emplace_back(file.uri_file());
@@ -103,6 +194,9 @@ std::shared_ptr<SplitInfo> parseScanSplitInfo(
case SubstraitFileFormatCase::kIceberg:
splitInfo = IcebergPlanConverter::parseIcebergSplitInfo(file,
std::move(splitInfo));
break;
+ case SubstraitFileFormatCase::kDelta:
+ splitInfo = parseDeltaSplitInfo(file, std::move(splitInfo));
+ break;
default:
splitInfo->format = dwio::common::FileFormat::UNKNOWN;
break;
diff --git a/cpp/velox/compute/WholeStageResultIterator.cc
b/cpp/velox/compute/WholeStageResultIterator.cc
index c3ac095cdc..753ccb2a68 100644
--- a/cpp/velox/compute/WholeStageResultIterator.cc
+++ b/cpp/velox/compute/WholeStageResultIterator.cc
@@ -15,9 +15,13 @@
* limitations under the License.
*/
#include "WholeStageResultIterator.h"
+#include <optional>
#include "VeloxBackend.h"
#include "VeloxPlanConverter.h"
#include "VeloxRuntime.h"
+#include "compute/delta/DeltaConnector.h"
+#include "compute/delta/DeltaSplit.h"
+#include "compute/delta/DeltaSplitInfo.h"
#include "config/VeloxConfig.h"
#include "utils/ConfigExtractor.h"
#include "velox/connectors/hive/HiveConfig.h"
@@ -69,6 +73,36 @@ const std::string kWriteIOTime = "writeIOWallNanos";
// others
const std::string kHiveDefaultPartition = "__HIVE_DEFAULT_PARTITION__";
+const std::string kDeltaTableFormat = "delta";
+
+const velox::core::TableScanNode* findTableScanNodeById(
+ const std::shared_ptr<const velox::core::PlanNode>& planNode,
+ const velox::core::PlanNodeId& nodeId) {
+ if (planNode == nullptr) {
+ return nullptr;
+ }
+
+ if (planNode->id() == nodeId) {
+ return dynamic_cast<const velox::core::TableScanNode*>(planNode.get());
+ }
+
+ for (const auto& source : planNode->sources()) {
+ if (const auto* found = findTableScanNodeById(source, nodeId)) {
+ return found;
+ }
+ }
+ return nullptr;
+}
+
+std::string connectorIdForScanNode(
+ const std::shared_ptr<const velox::core::PlanNode>& planNode,
+ const velox::core::PlanNodeId& nodeId) {
+ const auto* tableScanNode = findTableScanNodeById(planNode, nodeId);
+ if (tableScanNode == nullptr) {
+ return "";
+ }
+ return tableScanNode->tableHandle()->connectorId();
+}
} // namespace
@@ -134,7 +168,8 @@ WholeStageResultIterator::WholeStageResultIterator(
throw std::runtime_error("Invalid scan information.");
}
- for (const auto& scanInfo : scanInfos) {
+ for (size_t scanInfoIdx = 0; scanInfoIdx < scanInfos.size(); ++scanInfoIdx) {
+ const auto& scanInfo = scanInfos[scanInfoIdx];
// Get the information for TableScan.
// Partition index in scan info is not used.
const auto& paths = scanInfo->paths;
@@ -144,6 +179,9 @@ WholeStageResultIterator::WholeStageResultIterator(
const auto& format = scanInfo->format;
const auto& partitionColumns = scanInfo->partitionColumns;
const auto& metadataColumns = scanInfo->metadataColumns;
+ const auto scanNodeConnectorId = connectorIdForScanNode(veloxPlan_,
scanNodeIds_[scanInfoIdx]);
+ const auto deltaSplitInfo =
std::dynamic_pointer_cast<DeltaSplitInfo>(scanInfo);
+ const bool isDeltaScan = scanNodeConnectorId == connectorIds_.delta ||
deltaSplitInfo != nullptr;
#ifdef GLUTEN_ENABLE_GPU
// Under the pre-condition that all the split infos has same partition
column and format.
const auto canUseCudfConnector = scanInfo->canUseCudfConnector();
@@ -177,10 +215,37 @@ WholeStageResultIterator::WholeStageResultIterator(
deleteFiles,
metadataColumn,
properties[idx]);
+ } else if (isDeltaScan) {
+ std::unordered_map<std::string, std::string>
customSplitInfo{{"table_format", kDeltaTableFormat}};
+ std::optional<gluten::delta::DeltaDeletionVectorDescriptor>
deletionVector = std::nullopt;
+ auto rowIndexFilterType =
gluten::delta::DeltaRowIndexFilterType::kKeepAll;
+ if (deltaSplitInfo != nullptr) {
+ VELOX_USER_CHECK_LT(idx, deltaSplitInfo->deletionVectors.size());
+ VELOX_USER_CHECK_LT(idx, deltaSplitInfo->rowIndexFilterTypes.size());
+ deletionVector = deltaSplitInfo->deletionVectors[idx];
+ rowIndexFilterType = deltaSplitInfo->rowIndexFilterTypes[idx];
+ }
+ split = std::make_shared<gluten::delta::HiveDeltaSplit>(
+ connectorIds_.delta,
+ paths[idx],
+ format,
+ starts[idx],
+ lengths[idx],
+ partitionKeys,
+ std::nullopt,
+ customSplitInfo,
+ nullptr,
+ std::unordered_map<std::string, std::string>(),
+ true,
+ deletionVector,
+ std::nullopt,
+ rowIndexFilterType,
+ metadataColumn,
+ properties[idx]);
} else {
auto connectorId = connectorIds_.hive;
#ifdef GLUTEN_ENABLE_GPU
- if (canUseCudfConnector && enableCudf_ &&
+ if (connectorId == connectorIds_.hive && canUseCudfConnector &&
enableCudf_ &&
veloxCfg_->get<bool>(kCudfEnableTableScan,
kCudfEnableTableScanDefault)) {
connectorId = connectorIds_.cudfHive;
}
diff --git a/cpp/velox/compute/delta/DeltaSplitInfo.h
b/cpp/velox/compute/delta/DeltaSplitInfo.h
new file mode 100644
index 0000000000..c02e52f6b8
--- /dev/null
+++ b/cpp/velox/compute/delta/DeltaSplitInfo.h
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <optional>
+#include <string>
+#include <vector>
+
+#include "compute/delta/DeltaSplit.h"
+#include "substrait/SubstraitToVeloxPlan.h"
+
+namespace gluten {
+
+struct DeltaSplitInfo : SplitInfo {
+ std::vector<std::shared_ptr<std::string>> deletionVectorPayloads;
+ std::vector<std::optional<delta::DeltaDeletionVectorDescriptor>>
deletionVectors;
+ std::vector<delta::DeltaRowIndexFilterType> rowIndexFilterTypes;
+
+ DeltaSplitInfo(const SplitInfo& splitInfo) : SplitInfo(splitInfo) {
+ deletionVectors.reserve(splitInfo.paths.capacity());
+ deletionVectorPayloads.reserve(splitInfo.paths.capacity());
+ rowIndexFilterTypes.reserve(splitInfo.paths.capacity());
+
+ const auto previousFileCount = splitInfo.paths.empty() ? 0 :
splitInfo.paths.size() - 1;
+ deletionVectors.resize(previousFileCount, std::nullopt);
+ rowIndexFilterTypes.resize(previousFileCount,
delta::DeltaRowIndexFilterType::kKeepAll);
+ }
+};
+
+} // namespace gluten
diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc
b/cpp/velox/substrait/SubstraitToVeloxPlan.cc
index 73eef45245..2d7e65734b 100644
--- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc
+++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc
@@ -19,6 +19,8 @@
#include "TypeUtils.h"
#include "VariantToVectorConverter.h"
+#include "compute/delta/DeltaConnector.h"
+#include "compute/delta/DeltaSplitInfo.h"
#include "jni/JniHashTable.h"
#include "operators/hashjoin/HashTableBuilder.h"
#include "operators/plannodes/RowVectorStream.h"
@@ -58,6 +60,12 @@ bool useCudfTableHandle(const
std::vector<std::shared_ptr<SplitInfo>>& splitInfo
#endif
}
+// Delta scans are recognized structurally: parsing the substrait delta file
format case yields a
+// typed DeltaSplitInfo.
+bool isDeltaSplitInfo(const std::shared_ptr<SplitInfo>& splitInfo) {
+ return std::dynamic_pointer_cast<DeltaSplitInfo>(splitInfo) != nullptr;
+}
+
core::SortOrder toSortOrder(const ::substrait::SortField& sortField) {
switch (sortField.direction()) {
case ::substrait::SortField_SortDirection_SORT_DIRECTION_ASC_NULLS_FIRST:
@@ -1575,8 +1583,9 @@ core::PlanNodePtr
SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
connector::ConnectorTableHandlePtr tableHandle;
auto remainingFilter = readRel.has_filter() ?
exprConverter_->toVeloxExpr(readRel.filter(), baseSchema) : nullptr;
- auto connectorId = connectorIds_.hive;
- if (useCudfTableHandle(splitInfos_) &&
veloxCfg_->get<bool>(kCudfEnableTableScan, kCudfEnableTableScanDefault) &&
+ auto connectorId = isDeltaSplitInfo(splitInfo) ? connectorIds_.delta :
connectorIds_.hive;
+ if (connectorId == connectorIds_.hive && useCudfTableHandle(splitInfos_) &&
+ veloxCfg_->get<bool>(kCudfEnableTableScan, kCudfEnableTableScanDefault)
&&
veloxCfg_->get<bool>(kCudfEnabled, kCudfEnabledDefault)) {
#ifdef GLUTEN_ENABLE_GPU
connectorId = connectorIds_.cudfHive;
diff --git
a/gluten-delta/src/main/scala/org/apache/gluten/extension/OffloadDeltaScan.scala
b/gluten-delta/src-delta20/main/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfo.scala
similarity index 57%
copy from
gluten-delta/src/main/scala/org/apache/gluten/extension/OffloadDeltaScan.scala
copy to
gluten-delta/src-delta20/main/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfo.scala
index 5fe1b4ba86..af8e8df7da 100644
---
a/gluten-delta/src/main/scala/org/apache/gluten/extension/OffloadDeltaScan.scala
+++
b/gluten-delta/src-delta20/main/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfo.scala
@@ -14,19 +14,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.gluten.extension
+package org.apache.gluten.delta
-import org.apache.gluten.execution.DeltaScanTransformer
-import org.apache.gluten.extension.columnar.offload.OffloadSingleNode
+import org.apache.gluten.substrait.rel.DeltaLocalFilesNode.DeltaFileReadOptions
-import org.apache.spark.sql.delta.DeltaParquetFileFormat
-import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan}
+import org.apache.spark.sql.execution.datasources.PartitionedFile
-case class OffloadDeltaScan() extends OffloadSingleNode {
- override def offload(plan: SparkPlan): SparkPlan = plan match {
- case scan: FileSourceScanExec
- if scan.relation.fileFormat.getClass ==
classOf[DeltaParquetFileFormat] =>
- DeltaScanTransformer(scan)
- case other => other
- }
+import java.util.{Map => JMap}
+
+/** Reading deletion vectors natively requires Delta 3.3+, so there is nothing
to materialize. */
+object DeltaDeletionVectorScanInfo {
+ def normalize(partitionColumnCount: Int, partitionFiles:
Seq[PartitionedFile])
+ : Option[(Seq[JMap[String, Object]], Seq[DeltaFileReadOptions])] = None
}
diff --git
a/gluten-delta/src/main/scala/org/apache/gluten/extension/OffloadDeltaScan.scala
b/gluten-delta/src-delta23/main/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfo.scala
similarity index 57%
copy from
gluten-delta/src/main/scala/org/apache/gluten/extension/OffloadDeltaScan.scala
copy to
gluten-delta/src-delta23/main/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfo.scala
index 5fe1b4ba86..af8e8df7da 100644
---
a/gluten-delta/src/main/scala/org/apache/gluten/extension/OffloadDeltaScan.scala
+++
b/gluten-delta/src-delta23/main/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfo.scala
@@ -14,19 +14,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.gluten.extension
+package org.apache.gluten.delta
-import org.apache.gluten.execution.DeltaScanTransformer
-import org.apache.gluten.extension.columnar.offload.OffloadSingleNode
+import org.apache.gluten.substrait.rel.DeltaLocalFilesNode.DeltaFileReadOptions
-import org.apache.spark.sql.delta.DeltaParquetFileFormat
-import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan}
+import org.apache.spark.sql.execution.datasources.PartitionedFile
-case class OffloadDeltaScan() extends OffloadSingleNode {
- override def offload(plan: SparkPlan): SparkPlan = plan match {
- case scan: FileSourceScanExec
- if scan.relation.fileFormat.getClass ==
classOf[DeltaParquetFileFormat] =>
- DeltaScanTransformer(scan)
- case other => other
- }
+import java.util.{Map => JMap}
+
+/** Reading deletion vectors natively requires Delta 3.3+, so there is nothing
to materialize. */
+object DeltaDeletionVectorScanInfo {
+ def normalize(partitionColumnCount: Int, partitionFiles:
Seq[PartitionedFile])
+ : Option[(Seq[JMap[String, Object]], Seq[DeltaFileReadOptions])] = None
}
diff --git
a/gluten-delta/src/main/scala/org/apache/gluten/extension/OffloadDeltaScan.scala
b/gluten-delta/src-delta24/main/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfo.scala
similarity index 57%
copy from
gluten-delta/src/main/scala/org/apache/gluten/extension/OffloadDeltaScan.scala
copy to
gluten-delta/src-delta24/main/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfo.scala
index 5fe1b4ba86..af8e8df7da 100644
---
a/gluten-delta/src/main/scala/org/apache/gluten/extension/OffloadDeltaScan.scala
+++
b/gluten-delta/src-delta24/main/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfo.scala
@@ -14,19 +14,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.gluten.extension
+package org.apache.gluten.delta
-import org.apache.gluten.execution.DeltaScanTransformer
-import org.apache.gluten.extension.columnar.offload.OffloadSingleNode
+import org.apache.gluten.substrait.rel.DeltaLocalFilesNode.DeltaFileReadOptions
-import org.apache.spark.sql.delta.DeltaParquetFileFormat
-import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan}
+import org.apache.spark.sql.execution.datasources.PartitionedFile
-case class OffloadDeltaScan() extends OffloadSingleNode {
- override def offload(plan: SparkPlan): SparkPlan = plan match {
- case scan: FileSourceScanExec
- if scan.relation.fileFormat.getClass ==
classOf[DeltaParquetFileFormat] =>
- DeltaScanTransformer(scan)
- case other => other
- }
+import java.util.{Map => JMap}
+
+/** Reading deletion vectors natively requires Delta 3.3+, so there is nothing
to materialize. */
+object DeltaDeletionVectorScanInfo {
+ def normalize(partitionColumnCount: Int, partitionFiles:
Seq[PartitionedFile])
+ : Option[(Seq[JMap[String, Object]], Seq[DeltaFileReadOptions])] = None
}
diff --git
a/backends-velox/src-delta33/main/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfo.scala
b/gluten-delta/src-delta33/main/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfo.scala
similarity index 76%
rename from
backends-velox/src-delta33/main/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfo.scala
rename to
gluten-delta/src-delta33/main/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfo.scala
index 8ada3d755c..263d0ffb35 100644
---
a/backends-velox/src-delta33/main/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfo.scala
+++
b/gluten-delta/src-delta33/main/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfo.scala
@@ -17,9 +17,11 @@
package org.apache.gluten.delta
import org.apache.gluten.sql.shims.SparkShimLoader
+import org.apache.gluten.substrait.rel.DeltaLocalFilesNode
+import org.apache.gluten.substrait.rel.DeltaLocalFilesNode.DeltaFileReadOptions
import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.delta.GlutenDeltaParquetFileFormat
+import org.apache.spark.sql.delta.DeltaParquetFileFormat
import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor
import org.apache.spark.sql.delta.deletionvectors.{RoaringBitmapArrayFormat,
StoredBitmap}
import org.apache.spark.sql.delta.storage.dv.HadoopFileSystemDVStore
@@ -27,7 +29,7 @@ import
org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.hadoop.fs.Path
-import java.util.{ArrayList => JArrayList}
+import java.util.{Map => JMap}
import scala.collection.JavaConverters._
import scala.util.control.NonFatal
@@ -51,9 +53,27 @@ object DeltaDeletionVectorScanInfo {
deletionVectorInfo: DeletionVectorInfo)
private val RowIndexFilterIdEncoded =
- GlutenDeltaParquetFileFormat.FILE_ROW_INDEX_FILTER_ID_ENCODED
+ DeltaParquetFileFormat.FILE_ROW_INDEX_FILTER_ID_ENCODED
private val RowIndexFilterTypeKey =
- GlutenDeltaParquetFileFormat.FILE_ROW_INDEX_FILTER_TYPE
+ DeltaParquetFileFormat.FILE_ROW_INDEX_FILTER_TYPE
+
+ /**
+ * Materializes per-file Delta DV read options for a split, alongside each
file's metadata with
+ * the DV bookkeeping keys stripped. Returns None when no file in the split
carries a deletion
+ * vector, so callers can keep the generic split representation.
+ */
+ def normalize(partitionColumnCount: Int, partitionFiles:
Seq[PartitionedFile])
+ : Option[(Seq[JMap[String, Object]], Seq[DeltaFileReadOptions])] = {
+ val scanInfos = extractAll(activeSparkSession, partitionColumnCount,
partitionFiles)
+ if (scanInfos.exists(_.deletionVectorInfo.hasDeletionVector)) {
+ Some(
+ (
+ scanInfos.map(_.normalizedOtherMetadataColumns.asJava),
+ scanInfos.map(info =>
toDeltaFileReadOptions(info.deletionVectorInfo))))
+ } else {
+ None
+ }
+ }
def extract(
spark: SparkSession,
@@ -72,11 +92,30 @@ object DeltaDeletionVectorScanInfo {
files.map(extract(spark, partitionColumnCount, _))
}
- def extractAllFromJava(
- spark: SparkSession,
- partitionColumnCount: Int,
- files: java.util.List[PartitionedFile]):
java.util.List[PartitionFileScanInfo] = {
- new JArrayList(extractAll(spark, partitionColumnCount,
files.asScala.toSeq).asJava)
+ private def toDeltaFileReadOptions(dvInfo: DeletionVectorInfo):
DeltaFileReadOptions = {
+ new DeltaFileReadOptions(
+ toSubstraitRowIndexFilterType(dvInfo.rowIndexFilterType),
+ dvInfo.hasDeletionVector,
+ dvInfo.cardinality,
+ dvInfo.serializedDeletionVector)
+ }
+
+ private def toSubstraitRowIndexFilterType(
+ filterType: RowIndexFilterType): DeltaLocalFilesNode.RowIndexFilterType
= {
+ filterType match {
+ case IF_CONTAINED => DeltaLocalFilesNode.RowIndexFilterType.IF_CONTAINED
+ case IF_NOT_CONTAINED =>
DeltaLocalFilesNode.RowIndexFilterType.IF_NOT_CONTAINED
+ case _ => DeltaLocalFilesNode.RowIndexFilterType.KEEP_ALL
+ }
+ }
+
+ private def activeSparkSession: SparkSession = {
+ SparkSession.getActiveSession
+ .orElse(SparkSession.getDefaultSession)
+ .getOrElse {
+ throw new IllegalStateException(
+ "Active SparkSession is required to materialize Delta deletion
vectors")
+ }
}
private def extractDeletionVectorInfo(
diff --git
a/backends-velox/src-delta40/main/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfo.scala
b/gluten-delta/src-delta40/main/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfo.scala
similarity index 77%
rename from
backends-velox/src-delta40/main/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfo.scala
rename to
gluten-delta/src-delta40/main/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfo.scala
index 69a5dc3078..cddc8849fc 100644
---
a/backends-velox/src-delta40/main/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfo.scala
+++
b/gluten-delta/src-delta40/main/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfo.scala
@@ -17,9 +17,11 @@
package org.apache.gluten.delta
import org.apache.gluten.sql.shims.SparkShimLoader
+import org.apache.gluten.substrait.rel.DeltaLocalFilesNode
+import org.apache.gluten.substrait.rel.DeltaLocalFilesNode.DeltaFileReadOptions
import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.delta.GlutenDeltaParquetFileFormat
+import org.apache.spark.sql.delta.DeltaParquetFileFormat
import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor
import org.apache.spark.sql.delta.deletionvectors.{RoaringBitmapArrayFormat,
StoredBitmap}
import org.apache.spark.sql.delta.storage.dv.HadoopFileSystemDVStore
@@ -27,7 +29,7 @@ import
org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.hadoop.fs.Path
-import java.util.{ArrayList => JArrayList}
+import java.util.{Map => JMap}
import scala.collection.JavaConverters._
import scala.util.Try
@@ -52,9 +54,27 @@ object DeltaDeletionVectorScanInfo {
deletionVectorInfo: DeletionVectorInfo)
private val RowIndexFilterIdEncoded =
- GlutenDeltaParquetFileFormat.FILE_ROW_INDEX_FILTER_ID_ENCODED
+ DeltaParquetFileFormat.FILE_ROW_INDEX_FILTER_ID_ENCODED
private val RowIndexFilterTypeKey =
- GlutenDeltaParquetFileFormat.FILE_ROW_INDEX_FILTER_TYPE
+ DeltaParquetFileFormat.FILE_ROW_INDEX_FILTER_TYPE
+
+ /**
+ * Materializes per-file Delta DV read options for a split, alongside each
file's metadata with
+ * the DV bookkeeping keys stripped. Returns None when no file in the split
carries a deletion
+ * vector, so callers can keep the generic split representation.
+ */
+ def normalize(partitionColumnCount: Int, partitionFiles:
Seq[PartitionedFile])
+ : Option[(Seq[JMap[String, Object]], Seq[DeltaFileReadOptions])] = {
+ val scanInfos = extractAll(activeSparkSession, partitionColumnCount,
partitionFiles)
+ if (scanInfos.exists(_.deletionVectorInfo.hasDeletionVector)) {
+ Some(
+ (
+ scanInfos.map(_.normalizedOtherMetadataColumns.asJava),
+ scanInfos.map(info =>
toDeltaFileReadOptions(info.deletionVectorInfo))))
+ } else {
+ None
+ }
+ }
def extract(
spark: SparkSession,
@@ -73,11 +93,30 @@ object DeltaDeletionVectorScanInfo {
files.map(extract(spark, partitionColumnCount, _))
}
- def extractAllFromJava(
- spark: SparkSession,
- partitionColumnCount: Int,
- files: java.util.List[PartitionedFile]):
java.util.List[PartitionFileScanInfo] = {
- new JArrayList(extractAll(spark, partitionColumnCount,
files.asScala.toSeq).asJava)
+ private def toDeltaFileReadOptions(dvInfo: DeletionVectorInfo):
DeltaFileReadOptions = {
+ new DeltaFileReadOptions(
+ toSubstraitRowIndexFilterType(dvInfo.rowIndexFilterType),
+ dvInfo.hasDeletionVector,
+ dvInfo.cardinality,
+ dvInfo.serializedDeletionVector)
+ }
+
+ private def toSubstraitRowIndexFilterType(
+ filterType: RowIndexFilterType): DeltaLocalFilesNode.RowIndexFilterType
= {
+ filterType match {
+ case IF_CONTAINED => DeltaLocalFilesNode.RowIndexFilterType.IF_CONTAINED
+ case IF_NOT_CONTAINED =>
DeltaLocalFilesNode.RowIndexFilterType.IF_NOT_CONTAINED
+ case _ => DeltaLocalFilesNode.RowIndexFilterType.KEEP_ALL
+ }
+ }
+
+ private def activeSparkSession: SparkSession = {
+ SparkSession.getActiveSession
+ .orElse(SparkSession.getDefaultSession)
+ .getOrElse {
+ throw new IllegalStateException(
+ "Active SparkSession is required to materialize Delta deletion
vectors")
+ }
}
private def extractDeletionVectorInfo(
diff --git
a/gluten-delta/src/main/scala/org/apache/gluten/execution/DeltaScanTransformer.scala
b/gluten-delta/src/main/scala/org/apache/gluten/execution/DeltaScanTransformer.scala
index 154b735554..6ac644622d 100644
---
a/gluten-delta/src/main/scala/org/apache/gluten/execution/DeltaScanTransformer.scala
+++
b/gluten-delta/src/main/scala/org/apache/gluten/execution/DeltaScanTransformer.scala
@@ -16,19 +16,24 @@
*/
package org.apache.gluten.execution
+import org.apache.gluten.delta.DeltaDeletionVectorScanInfo
import org.apache.gluten.sql.shims.SparkShimLoader
+import org.apache.gluten.substrait.rel.{DeltaLocalFilesBuilder,
LocalFilesNode, SplitInfo}
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
+import org.apache.spark.Partition
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference, Expression}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.connector.read.streaming.SparkDataStream
import org.apache.spark.sql.delta.{DeltaParquetFileFormat, NoMapping}
import org.apache.spark.sql.execution.FileSourceScanExec
-import org.apache.spark.sql.execution.datasources.HadoopFsRelation
+import org.apache.spark.sql.execution.datasources.{FilePartition,
HadoopFsRelation}
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.collection.BitSet
+import scala.collection.JavaConverters._
+
case class DeltaScanTransformer(
@transient override val relation: HadoopFsRelation,
@transient stream: Option[SparkDataStream],
@@ -84,16 +89,30 @@ case class DeltaScanTransformer(
case _ => dataFilters
}
- override protected def doValidateInternal(): ValidationResult = {
- if (
- requiredSchema.fields.exists(
- _.name == "__delta_internal_is_row_deleted") ||
requiredSchema.fields.exists(
- _.name == "__delta_internal_row_index")
- ) {
- return ValidationResult.failed(s"Deletion vector is not supported in
native.")
+ /**
+ * Decorates the generically built split infos with per-file deletion-vector
read options so the
+ * native Delta scan can apply DV filtering. Delta-specific extraction
happens here -- where Delta
+ * classes are directly linkable -- rather than in the backend iterator API,
mirroring
+ * `IcebergScanTransformer`. Splits without any DV keep the generic
representation.
+ */
+ override def getSplitInfosFromPartitions(
+ partitions: Seq[(Partition, ReadFileFormat)]): Seq[SplitInfo] = {
+ val splitInfos = super.getSplitInfosFromPartitions(partitions)
+ val partitionColumnCount = getPartitionSchema.fields.length
+ splitInfos.zip(partitions).map {
+ case (localFiles: LocalFilesNode, (filePartition: FilePartition, _)) =>
+ DeltaDeletionVectorScanInfo
+ .normalize(partitionColumnCount, filePartition.files.toSeq)
+ .map {
+ case (otherMetadataColumns, deltaReadOptions) =>
+ DeltaLocalFilesBuilder.makeDeltaLocalFiles(
+ localFiles,
+ otherMetadataColumns.asJava,
+ deltaReadOptions.asJava): SplitInfo
+ }
+ .getOrElse(localFiles)
+ case (splitInfo, _) => splitInfo
}
-
- super.doValidateInternal()
}
override def doCanonicalize(): DeltaScanTransformer = {
@@ -119,7 +138,6 @@ case class DeltaScanTransformer(
}
object DeltaScanTransformer {
-
def apply(scanExec: FileSourceScanExec): DeltaScanTransformer = {
new DeltaScanTransformer(
scanExec.relation,
diff --git
a/gluten-delta/src/main/scala/org/apache/gluten/extension/DeltaPostTransformRules.scala
b/gluten-delta/src/main/scala/org/apache/gluten/extension/DeltaPostTransformRules.scala
index 421db09732..d984faf75b 100644
---
a/gluten-delta/src/main/scala/org/apache/gluten/extension/DeltaPostTransformRules.scala
+++
b/gluten-delta/src/main/scala/org/apache/gluten/extension/DeltaPostTransformRules.scala
@@ -16,16 +16,17 @@
*/
package org.apache.gluten.extension
-import org.apache.gluten.execution.{DeltaScanTransformer,
ProjectExecTransformer}
+import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.execution.{DeltaScanTransformer,
FilterExecTransformerBase, ProjectExecTransformer}
import org.apache.gluten.extension.columnar.transition.RemoveTransitions
import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute,
AttributeReference, CreateNamedStruct, Expression, GetStructField, If,
InputFileBlockLength, InputFileBlockStart, InputFileName, IsNull,
LambdaFunction, Literal, NamedLambdaVariable}
+import org.apache.spark.sql.catalyst.expressions.{Alias, And, Attribute,
AttributeReference, CreateNamedStruct, Expression, GetStructField, If,
InputFileBlockLength, InputFileBlockStart, InputFileName, IsNull,
LambdaFunction, Literal, NamedExpression, NamedLambdaVariable}
import org.apache.spark.sql.catalyst.expressions.{ArrayTransform,
TransformKeys, TransformValues}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.TreeNodeTag
import org.apache.spark.sql.delta.{DeltaColumnMapping, DeltaParquetFileFormat,
NoMapping}
-import org.apache.spark.sql.execution.{ProjectExec, SparkPlan}
+import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan}
import org.apache.spark.sql.execution.datasources.FileFormat
import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructType}
@@ -34,10 +35,20 @@ import scala.collection.mutable.ListBuffer
object DeltaPostTransformRules {
def rules: Seq[Rule[SparkPlan]] =
- RemoveTransitions :: pushDownInputFileExprRule :: columnMappingRule :: Nil
+ RemoveTransitions ::
+ nativeDeletionVectorRule ::
+ pushDownInputFileExprRule ::
+ columnMappingRule :: Nil
+
+ private val deletionVectorDeletedRowColumnName =
"__delta_internal_is_row_deleted"
+ private val deletionVectorRowIndexColumnName = "__delta_internal_row_index"
+ private val deletionVectorInternalColumnNames =
+ Set(deletionVectorDeletedRowColumnName, deletionVectorRowIndexColumnName)
private val COLUMN_MAPPING_RULE_TAG: TreeNodeTag[String] =
TreeNodeTag[String]("org.apache.gluten.delta.column.mapping")
+ private val PRESERVE_DELETION_VECTOR_ROW_INDEX_TAG: TreeNodeTag[Boolean] =
+
TreeNodeTag[Boolean]("org.apache.gluten.delta.preserve.deletion.vector.row.index")
private def notAppliedColumnMappingRule(plan: SparkPlan): Boolean = {
plan.getTagValue(COLUMN_MAPPING_RULE_TAG).isEmpty
@@ -65,6 +76,101 @@ object DeltaPostTransformRules {
child.copy(output = p.output)
}
+ /**
+ * Spark Delta injects synthetic deletion-vector predicates and columns into
the plan (via
+ * `PreprocessTableWithDVsStrategy`). Those drive the JVM reader path; for
the native Delta scan
+ * path they must be stripped, since Velox applies the DV as a per-file mask
from the split info
+ * and would otherwise double-apply it with incompatible semantics.
+ *
+ * {{{
+ * Input (scan offloaded, shape injected by Delta during planning):
+ * ProjectExecTransformer [key, value]
+ * +- FilterExecTransformer (__delta_internal_is_row_deleted = 0)
+ * +- DeltaScanTransformer [key, value, __delta_internal_is_row_deleted]
+ *
+ * Output:
+ * DeltaScanTransformer [key, value]
+ * }}}
+ *
+ * `__delta_internal_row_index` is preserved when an upstream operator still
references it.
+ * Non-offloaded scans are untouched and keep vanilla JVM filtering.
+ */
+ val nativeDeletionVectorRule: Rule[SparkPlan] = (plan: SparkPlan) => {
+ tagRowIndexRequiredSubtrees(plan)
+ plan.transformUp {
+ case scan: DeltaScanTransformer =>
+ val cleanedDataFilters =
scan.dataFilters.flatMap(stripDeletionVectorPredicate)
+ val cleanedPushDownFilters =
+ scan.pushDownFilters.map(_.flatMap(stripDeletionVectorPredicate))
+ val preserveRowIndex = shouldPreserveDeletionVectorRowIndex(scan)
+ val cleanedOutput = stripDeletionVectorInternalOutput(scan.output,
preserveRowIndex)
+ val cleanedRequiredSchema =
+ stripDeletionVectorInternalSchema(scan.requiredSchema,
preserveRowIndex)
+ if (
+ cleanedDataFilters == scan.dataFilters &&
+ cleanedPushDownFilters == scan.pushDownFilters &&
+ cleanedOutput == scan.output &&
+ cleanedRequiredSchema == scan.requiredSchema
+ ) {
+ scan
+ } else {
+ scan.copy(
+ output = cleanedOutput,
+ requiredSchema = cleanedRequiredSchema,
+ dataFilters = cleanedDataFilters,
+ pushDownFilters = cleanedPushDownFilters)
+ }
+ case project: ProjectExecTransformer if
containsNativeDeltaScan(project.child) =>
+ val cleanedProjectList = stripDeletionVectorInternalProjectList(
+ project.projectList,
+ shouldPreserveDeletionVectorRowIndex(project))
+ if (cleanedProjectList == project.projectList) {
+ project
+ } else if (cleanedProjectList.isEmpty) {
+ project.child
+ } else {
+ ProjectExecTransformer(cleanedProjectList, project.child)
+ }
+ case project: ProjectExec if containsNativeDeltaScan(project.child) =>
+ val cleanedProjectList = stripDeletionVectorInternalProjectList(
+ project.projectList,
+ shouldPreserveDeletionVectorRowIndex(project))
+ if (cleanedProjectList == project.projectList) {
+ project
+ } else if (cleanedProjectList.isEmpty) {
+ project.child
+ } else {
+ ProjectExec(cleanedProjectList, project.child)
+ }
+ case filter: FilterExecTransformerBase if
containsNativeDeltaScan(filter.child) =>
+ stripDeletionVectorPredicate(filter.cond) match {
+ case Some(cleanCondition) if cleanCondition != filter.cond =>
+ BackendsApiManager.getSparkPlanExecApiInstance
+ .genFilterExecTransformer(cleanCondition, filter.child)
+ case Some(_) =>
+ filter
+ case None =>
+ filter.child
+ }
+ case filter: FilterExec if containsNativeDeltaScan(filter.child) =>
+ stripDeletionVectorPredicate(filter.condition) match {
+ case Some(cleanCondition) if cleanCondition != filter.condition =>
+ FilterExec(cleanCondition, filter.child)
+ case Some(_) =>
+ filter
+ case None =>
+ filter.child
+ }
+ }
+ }
+
+ private def containsNativeDeltaScan(plan: SparkPlan): Boolean = {
+ plan.exists {
+ case _: DeltaScanTransformer => true
+ case _ => false
+ }
+ }
+
private def isDeltaColumnMappingFileFormat(fileFormat: FileFormat): Boolean
= fileFormat match {
case d: DeltaParquetFileFormat if d.columnMappingMode != NoMapping =>
true
@@ -79,6 +185,82 @@ object DeltaPostTransformRules {
}
}
+ private def referencesDeletionVectorInternalColumn(expr: Expression):
Boolean = {
+ expr.references.exists(attr =>
deletionVectorInternalColumnNames.contains(attr.name))
+ }
+
+ private def referencesDeletionVectorRowIndex(expr: Expression): Boolean = {
+ expr.references.exists(_.name == deletionVectorRowIndexColumnName)
+ }
+
+ private def tagRowIndexRequiredSubtrees(plan: SparkPlan): Unit = {
+ def tagSubtree(subtree: SparkPlan): Unit = {
+ subtree.foreach(_.setTagValue(PRESERVE_DELETION_VECTOR_ROW_INDEX_TAG,
true))
+ }
+
+ def visit(node: SparkPlan): Unit = {
+ val shouldPreserveRowIndex =
+ node.expressions.exists(containsIncrementMetricExpr) ||
+ node.expressions.exists(referencesDeletionVectorRowIndex)
+ if (shouldPreserveRowIndex) {
+ node.children.foreach(tagSubtree)
+ }
+ node.children.foreach(visit)
+ }
+
+ visit(plan)
+ }
+
+ private def shouldPreserveDeletionVectorRowIndex(plan: SparkPlan): Boolean =
{
+ plan.getTagValue(PRESERVE_DELETION_VECTOR_ROW_INDEX_TAG).contains(true) ||
+ plan.expressions.exists(containsIncrementMetricExpr) ||
+ plan.expressions.exists(referencesDeletionVectorRowIndex)
+ }
+
+ private def shouldStripDeletionVectorInternalColumn(
+ columnName: String,
+ preserveRowIndex: Boolean): Boolean = {
+ columnName == deletionVectorDeletedRowColumnName ||
+ (!preserveRowIndex && columnName == deletionVectorRowIndexColumnName)
+ }
+
+ private def stripDeletionVectorInternalOutput(
+ output: Seq[Attribute],
+ preserveRowIndex: Boolean): Seq[Attribute] = {
+ output.filterNot(attr =>
shouldStripDeletionVectorInternalColumn(attr.name, preserveRowIndex))
+ }
+
+ private def stripDeletionVectorInternalProjectList(
+ projectList: Seq[NamedExpression],
+ preserveRowIndex: Boolean): Seq[NamedExpression] = {
+ projectList.filterNot(
+ expr => shouldStripDeletionVectorInternalColumn(expr.name,
preserveRowIndex))
+ }
+
+ private def stripDeletionVectorInternalSchema(
+ schema: StructType,
+ preserveRowIndex: Boolean): StructType = {
+ StructType(
+ schema.filterNot(
+ field => shouldStripDeletionVectorInternalColumn(field.name,
preserveRowIndex)))
+ }
+
+ private def stripDeletionVectorPredicate(expr: Expression):
Option[Expression] = {
+ expr match {
+ case And(left, right) =>
+ (stripDeletionVectorPredicate(left),
stripDeletionVectorPredicate(right)) match {
+ case (Some(cleanLeft), Some(cleanRight)) => Some(And(cleanLeft,
cleanRight))
+ case (Some(cleanLeft), None) => Some(cleanLeft)
+ case (None, Some(cleanRight)) => Some(cleanRight)
+ case (None, None) => None
+ }
+ case other if referencesDeletionVectorInternalColumn(other) =>
+ None
+ case other =>
+ Some(other)
+ }
+ }
+
private def isInputFileRelatedAttribute(attr: Attribute): Boolean = {
attr match {
case AttributeReference(name, _, _, _) =>
diff --git
a/gluten-delta/src/main/scala/org/apache/gluten/extension/OffloadDeltaScan.scala
b/gluten-delta/src/main/scala/org/apache/gluten/extension/OffloadDeltaScan.scala
index 5fe1b4ba86..ebafb0c08c 100644
---
a/gluten-delta/src/main/scala/org/apache/gluten/extension/OffloadDeltaScan.scala
+++
b/gluten-delta/src/main/scala/org/apache/gluten/extension/OffloadDeltaScan.scala
@@ -17,16 +17,93 @@
package org.apache.gluten.extension
import org.apache.gluten.execution.DeltaScanTransformer
+import org.apache.gluten.extension.columnar.FallbackTags
import org.apache.gluten.extension.columnar.offload.OffloadSingleNode
import org.apache.spark.sql.delta.DeltaParquetFileFormat
+import org.apache.spark.sql.delta.SnapshotDescriptor
+import
org.apache.spark.sql.delta.commands.DeletionVectorUtils.deletionVectorsReadable
+import org.apache.spark.sql.delta.files.TahoeFileIndex
+import org.apache.spark.sql.delta.stats.PreparedDeltaFileIndex
import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan}
+import org.apache.spark.util.SparkVersionUtil
case class OffloadDeltaScan() extends OffloadSingleNode {
+ private val DeletionVectorsUseMetadataRowIndexKey =
+ "spark.databricks.delta.deletionVectors.useMetadataRowIndex"
+
override def offload(plan: SparkPlan): SparkPlan = plan match {
+ case scan: FileSourceScanExec if isDeltaLogScan(scan) =>
+ FallbackTags.add(scan, "fallback Delta _delta_log scan")
+ scan
+ case scan: FileSourceScanExec if
shouldFallbackSpark34DeletionVectorScan(scan) =>
+ FallbackTags.add(scan, "fallback Spark 3.4 Delta DV scan")
+ scan
case scan: FileSourceScanExec
- if scan.relation.fileFormat.getClass ==
classOf[DeltaParquetFileFormat] =>
+ if shouldFallbackDeletionVectorScanWithoutMetadataRowIndex(scan) =>
+ FallbackTags.add(scan, "fallback Delta DV scan without metadata row
index")
+ scan
+ case scan: FileSourceScanExec if isDeltaScan(scan) =>
DeltaScanTransformer(scan)
case other => other
}
+
+ private def isDeltaScan(scan: FileSourceScanExec): Boolean = {
+ isDeltaFileIndex(scan) || isDeltaParquetScan(scan)
+ }
+
+ private def isDeltaParquetScan(scan: FileSourceScanExec): Boolean = {
+ val fileFormatClass = scan.relation.fileFormat.getClass
+ fileFormatClass == classOf[DeltaParquetFileFormat] ||
+ fileFormatClass.getSimpleName == "GlutenDeltaParquetFileFormat"
+ }
+
+ private def isDeltaFileIndex(scan: FileSourceScanExec): Boolean = {
+ scan.relation.location.isInstanceOf[TahoeFileIndex] ||
+ scan.relation.location.isInstanceOf[PreparedDeltaFileIndex]
+ }
+
+ private def isDeltaLogScan(scan: FileSourceScanExec): Boolean = {
+ scan.relation.location.rootPaths.exists {
+ path =>
+ val root = path.toString
+ root.contains("/_delta_log") || root.contains("\\_delta_log") ||
root.endsWith("_delta_log")
+ }
+ }
+
+ private def shouldFallbackSpark34DeletionVectorScan(scan:
FileSourceScanExec): Boolean = {
+ if (SparkVersionUtil.gteSpark35) {
+ return false
+ }
+
+ containsDeletionVector(scan)
+ }
+
+ private def shouldFallbackDeletionVectorScanWithoutMetadataRowIndex(
+ scan: FileSourceScanExec): Boolean = {
+ if (!SparkVersionUtil.gteSpark35) {
+ return false
+ }
+
+ // Delta DML tests force this path and rely on Spark's injected
+ // row-index filter column for correctness. Keep it on Spark until the
native path can
+ // prove the same contract for DML-generated DVs.
+ val useMetadataRowIndex =
+ scan.relation.sparkSession.sessionState.conf
+ .getConfString(DeletionVectorsUseMetadataRowIndexKey, "true")
+ .toBoolean
+ !useMetadataRowIndex && containsDeletionVector(scan)
+ }
+
+ private def containsDeletionVector(scan: FileSourceScanExec): Boolean = {
+ scan.relation.location match {
+ case preparedIndex: PreparedDeltaFileIndex =>
+ preparedIndex.preparedScan.files.exists(_.deletionVector != null)
+ case index: TahoeFileIndex =>
+ val snapshot = index.asInstanceOf[SnapshotDescriptor]
+ deletionVectorsReadable(snapshot.protocol, snapshot.metadata)
+ case _ =>
+ false
+ }
+ }
}
diff --git
a/gluten-delta/src/test/scala/org/apache/gluten/execution/DeltaSuite.scala
b/gluten-delta/src/test/scala/org/apache/gluten/execution/DeltaSuite.scala
index 5f88e18203..c138bed1cd 100644
--- a/gluten-delta/src/test/scala/org/apache/gluten/execution/DeltaSuite.scala
+++ b/gluten-delta/src/test/scala/org/apache/gluten/execution/DeltaSuite.scala
@@ -19,6 +19,7 @@ package org.apache.gluten.execution
import org.apache.spark.SparkConf
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
+import org.apache.spark.util.SparkVersionUtil
import scala.collection.JavaConverters._
@@ -37,6 +38,7 @@ abstract class DeltaSuite extends WholeStageTransformerSuite {
.set("spark.memory.offHeap.size", "2g")
.set("spark.unsafe.exceptionOnMemoryLeak", "true")
.set("spark.sql.autoBroadcastJoinThreshold", "-1")
+ .set("spark.sql.ansi.enabled", "false")
.set("spark.sql.sources.useV1SourceList", "avro")
.set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.set("spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog")
@@ -422,12 +424,16 @@ abstract class DeltaSuite extends
WholeStageTransformerSuite {
s"ALTER TABLE delta.`$path` SET TBLPROPERTIES
('delta.enableDeletionVectors' = true)")
checkAnswer(spark.read.format("delta").load(path), df1.union(df2))
spark.sql(s"DELETE FROM delta.`$path` WHERE id IN
(${values2.mkString(", ")})")
- import org.apache.spark.sql.execution.GlutenImplicits._
val df = spark.read.format("delta").load(path)
- assert(
- df.fallbackSummary.fallbackNodeToReason
- .flatMap(_.values)
- .exists(_.contains("Deletion vector is not supported in native")))
+ val executedPlan = df.queryExecution.executedPlan
+ if (SparkVersionUtil.gteSpark35) {
+ assert(executedPlan.collect { case _: DeltaScanTransformer => true
}.nonEmpty)
+ val planText = executedPlan.toString()
+ assert(!planText.contains("__delta_internal_is_row_deleted"))
+ assert(!planText.contains("__delta_internal_row_index"))
+ } else {
+ assert(executedPlan.collect { case _: DeltaScanTransformer => true
}.isEmpty)
+ }
checkAnswer(df, df1)
}
}
@@ -533,13 +539,13 @@ abstract class DeltaSuite extends
WholeStageTransformerSuite {
withSQLConf("spark.gluten.sql.columnar.scanOnly" -> "true") {
withTable("delta_pf") {
spark.sql(s"""
- |create table test (id int, name string) using delta
+ |create table delta_pf (id int, name string) using delta
|""".stripMargin)
spark.sql(s"""
- |insert into test values (1, "v1"), (2, "v2"), (3, "v1"),
(4, "v2")
+ |insert into delta_pf values (1, "v1"), (2, "v2"), (3,
"v1"), (4, "v2")
|""".stripMargin)
runQueryAndCompare(
- "select id from test where name > 'v1'",
+ "select id from delta_pf where name > 'v1'",
compareResult = true,
noFallBack = false) {
df =>
diff --git
a/gluten-delta/src/main/scala/org/apache/gluten/extension/OffloadDeltaScan.scala
b/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/DeltaLocalFilesBuilder.java
similarity index 54%
copy from
gluten-delta/src/main/scala/org/apache/gluten/extension/OffloadDeltaScan.scala
copy to
gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/DeltaLocalFilesBuilder.java
index 5fe1b4ba86..c448dca508 100644
---
a/gluten-delta/src/main/scala/org/apache/gluten/extension/OffloadDeltaScan.scala
+++
b/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/DeltaLocalFilesBuilder.java
@@ -14,19 +14,24 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.gluten.extension
+package org.apache.gluten.substrait.rel;
-import org.apache.gluten.execution.DeltaScanTransformer
-import org.apache.gluten.extension.columnar.offload.OffloadSingleNode
+import
org.apache.gluten.substrait.rel.DeltaLocalFilesNode.DeltaFileReadOptions;
-import org.apache.spark.sql.delta.DeltaParquetFileFormat
-import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan}
+import java.util.List;
+import java.util.Map;
-case class OffloadDeltaScan() extends OffloadSingleNode {
- override def offload(plan: SparkPlan): SparkPlan = plan match {
- case scan: FileSourceScanExec
- if scan.relation.fileFormat.getClass ==
classOf[DeltaParquetFileFormat] =>
- DeltaScanTransformer(scan)
- case other => other
+public class DeltaLocalFilesBuilder {
+ private DeltaLocalFilesBuilder() {}
+
+ /**
+ * Decorates a generically built {@link LocalFilesNode} with per-file Delta
read options,
+ * replacing its extra metadata with the Delta-normalized variant.
+ */
+ public static DeltaLocalFilesNode makeDeltaLocalFiles(
+ LocalFilesNode base,
+ List<Map<String, Object>> otherMetadataColumns,
+ List<DeltaFileReadOptions> deltaReadOptions) {
+ return new DeltaLocalFilesNode(base, otherMetadataColumns,
deltaReadOptions);
}
}
diff --git
a/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/DeltaLocalFilesNode.java
b/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/DeltaLocalFilesNode.java
new file mode 100644
index 0000000000..f79486947a
--- /dev/null
+++
b/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/DeltaLocalFilesNode.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.substrait.rel;
+
+import com.google.protobuf.ByteString;
+import io.substrait.proto.ReadRel;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class DeltaLocalFilesNode extends LocalFilesNode {
+ private final List<DeltaFileReadOptions> deltaReadOptions = new
ArrayList<>();
+
+ DeltaLocalFilesNode(
+ LocalFilesNode base,
+ List<Map<String, Object>> otherMetadataColumns,
+ List<DeltaFileReadOptions> deltaReadOptions) {
+ super(base, otherMetadataColumns);
+ if (deltaReadOptions == null || deltaReadOptions.size() !=
getPaths().size()) {
+ throw new IllegalArgumentException(
+ String.format(
+ "deltaReadOptions must contain one entry per file path, expected
%d but got %s",
+ getPaths().size(),
+ deltaReadOptions == null ? "null" :
String.valueOf(deltaReadOptions.size())));
+ }
+ this.deltaReadOptions.addAll(deltaReadOptions);
+ }
+
+ @Override
+ protected void processFileBuilder(ReadRel.LocalFiles.FileOrFiles.Builder
fileBuilder, int index) {
+ DeltaFileReadOptions options = deltaReadOptions.get(index);
+ ReadRel.LocalFiles.FileOrFiles.DeltaReadOptions.Builder deltaBuilder =
+ ReadRel.LocalFiles.FileOrFiles.DeltaReadOptions.newBuilder()
+
.setRowIndexFilterType(toProtoRowIndexFilterType(options.rowIndexFilterType()))
+ .setHasDeletionVector(options.hasDeletionVector());
+
+ if (options.hasDeletionVector()) {
+ deltaBuilder
+ .setDeletionVectorCardinality(options.deletionVectorCardinality())
+
.setSerializedDeletionVector(ByteString.copyFrom(options.serializedDeletionVector()));
+ }
+
+ fileBuilder.setDelta(deltaBuilder.build());
+ }
+
+ private static
ReadRel.LocalFiles.FileOrFiles.DeltaReadOptions.RowIndexFilterType
+ toProtoRowIndexFilterType(RowIndexFilterType rowIndexFilterType) {
+ switch (rowIndexFilterType) {
+ case IF_CONTAINED:
+ return
ReadRel.LocalFiles.FileOrFiles.DeltaReadOptions.RowIndexFilterType.IF_CONTAINED;
+ case IF_NOT_CONTAINED:
+ return
ReadRel.LocalFiles.FileOrFiles.DeltaReadOptions.RowIndexFilterType.IF_NOT_CONTAINED;
+ case KEEP_ALL:
+ default:
+ return
ReadRel.LocalFiles.FileOrFiles.DeltaReadOptions.RowIndexFilterType.KEEP_ALL;
+ }
+ }
+
+ public enum RowIndexFilterType {
+ KEEP_ALL,
+ IF_CONTAINED,
+ IF_NOT_CONTAINED
+ }
+
+ public static class DeltaFileReadOptions implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final RowIndexFilterType rowIndexFilterType;
+ private final boolean hasDeletionVector;
+ private final long deletionVectorCardinality;
+ private final byte[] serializedDeletionVector;
+
+ public DeltaFileReadOptions(
+ RowIndexFilterType rowIndexFilterType,
+ boolean hasDeletionVector,
+ long deletionVectorCardinality,
+ byte[] serializedDeletionVector) {
+ this.rowIndexFilterType = rowIndexFilterType;
+ this.hasDeletionVector = hasDeletionVector;
+ this.deletionVectorCardinality = deletionVectorCardinality;
+ this.serializedDeletionVector =
+ serializedDeletionVector == null ? new byte[0] :
serializedDeletionVector;
+ }
+
+ public RowIndexFilterType rowIndexFilterType() {
+ return rowIndexFilterType;
+ }
+
+ public boolean hasDeletionVector() {
+ return hasDeletionVector;
+ }
+
+ public long deletionVectorCardinality() {
+ return deletionVectorCardinality;
+ }
+
+ public byte[] serializedDeletionVector() {
+ return serializedDeletionVector;
+ }
+ }
+}
diff --git
a/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java
b/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java
index bc42dd302e..8a79351ad2 100644
---
a/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java
+++
b/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java
@@ -93,6 +93,27 @@ public class LocalFilesNode implements SplitInfo {
this.iterAsInput = true;
}
+ /**
+ * Copies an existing node, replacing its per-file extra metadata. Lets
data-lake subclasses
+ * decorate a generically built node without re-deriving the file listing.
+ */
+ protected LocalFilesNode(LocalFilesNode other, List<Map<String, Object>>
otherMetadataColumns) {
+ this.index = other.index;
+ this.paths.addAll(other.paths);
+ this.starts.addAll(other.starts);
+ this.lengths.addAll(other.lengths);
+ this.fileSizes.addAll(other.fileSizes);
+ this.modificationTimes.addAll(other.modificationTimes);
+ this.partitionColumns.addAll(other.partitionColumns);
+ this.metadataColumns.addAll(other.metadataColumns);
+ this.fileFormat = other.fileFormat;
+ this.preferredLocations.addAll(other.preferredLocations);
+ this.fileReadProperties = other.fileReadProperties;
+ this.iterAsInput = other.iterAsInput;
+ this.fileSchema = other.fileSchema;
+ this.otherMetadataColumns.addAll(otherMetadataColumns);
+ }
+
public List<String> getPaths() {
return paths;
}
diff --git
a/gluten-substrait/src/main/resources/substrait/proto/substrait/algebra.proto
b/gluten-substrait/src/main/resources/substrait/proto/substrait/algebra.proto
index 2bfb68e097..02c7f4cc5c 100644
---
a/gluten-substrait/src/main/resources/substrait/proto/substrait/algebra.proto
+++
b/gluten-substrait/src/main/resources/substrait/proto/substrait/algebra.proto
@@ -197,6 +197,18 @@ message ReadRel {
repeated DeleteFile delete_files = 3;
}
+ message DeltaReadOptions {
+ enum RowIndexFilterType {
+ KEEP_ALL = 0;
+ IF_CONTAINED = 1;
+ IF_NOT_CONTAINED = 2;
+ }
+ RowIndexFilterType row_index_filter_type = 1;
+ bool has_deletion_vector = 2;
+ uint64 deletion_vector_cardinality = 3;
+ bytes serialized_deletion_vector = 4;
+ }
+
// File reading options
oneof file_format {
ParquetReadOptions parquet = 9;
@@ -207,6 +219,7 @@ message ReadRel {
TextReadOptions text = 14;
JsonReadOptions json = 15;
IcebergReadOptions iceberg = 16;
+ DeltaReadOptions delta = 22;
}
message partitionColumn {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]