This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new e269ba2f9a [VL] Base Delta read support for Spark 4.0 + Delta 4.0
(#10973)
e269ba2f9a is described below
commit e269ba2f9ad5d1ebb4cf696e090227de625831dc
Author: Hongze Zhang <[email protected]>
AuthorDate: Fri Oct 31 11:31:56 2025 +0000
[VL] Base Delta read support for Spark 4.0 + Delta 4.0 (#10973)
---
.../sql/perf/DeltaOptimizedWriterTransformer.scala | 0
.../execution/DeltaFilterExecTransformer.scala | 80 +++++++++++++++++++
.../execution/DeltaProjectExecTransformer.scala | 92 ++++++++++++++++++++++
.../sql/hive/HiveTableScanExecTransformer.scala | 4 +-
pom.xml | 4 +-
5 files changed, 176 insertions(+), 4 deletions(-)
diff --git
a/gluten-delta/src-delta33/main/scala/org/apache/spark/sql/perf/DeltaOptimizedWriterTransformer.scala
b/backends-clickhouse/src-delta33/main/scala/org/apache/spark/sql/perf/DeltaOptimizedWriterTransformer.scala
similarity index 100%
rename from
gluten-delta/src-delta33/main/scala/org/apache/spark/sql/perf/DeltaOptimizedWriterTransformer.scala
rename to
backends-clickhouse/src-delta33/main/scala/org/apache/spark/sql/perf/DeltaOptimizedWriterTransformer.scala
diff --git
a/gluten-delta/src-delta40/main/scala/org/apache/gluten/execution/DeltaFilterExecTransformer.scala
b/gluten-delta/src-delta40/main/scala/org/apache/gluten/execution/DeltaFilterExecTransformer.scala
new file mode 100644
index 0000000000..b71ff4ca4b
--- /dev/null
+++
b/gluten-delta/src-delta40/main/scala/org/apache/gluten/execution/DeltaFilterExecTransformer.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.expression.{ConverterUtils, ExpressionConverter}
+import org.apache.gluten.metrics.MetricsUpdater
+import org.apache.gluten.substrait.`type`.TypeBuilder
+import org.apache.gluten.substrait.SubstraitContext
+import org.apache.gluten.substrait.extensions.ExtensionBuilder
+import org.apache.gluten.substrait.rel.{RelBuilder, RelNode}
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+import org.apache.spark.sql.delta.metric.IncrementMetric
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.metric.SQLMetric
+
+import scala.collection.JavaConverters._
+
+case class DeltaFilterExecTransformer(condition: Expression, child: SparkPlan)
+ extends FilterExecTransformerBase(condition, child) {
+
+ private var extraMetrics: Seq[(String, SQLMetric)] = Seq.empty
+
+ override def metricsUpdater(): MetricsUpdater =
+
BackendsApiManager.getMetricsApiInstance.genFilterTransformerMetricsUpdater(
+ metrics,
+ extraMetrics.toSeq)
+
+ override def getRelNode(
+ context: SubstraitContext,
+ condExpr: Expression,
+ originalInputAttributes: Seq[Attribute],
+ operatorId: Long,
+ input: RelNode,
+ validation: Boolean): RelNode = {
+ assert(condExpr != null)
+ val condExprNode = condExpr match {
+ case IncrementMetric(child, metric) =>
+ extraMetrics :+= (condExpr.prettyName, metric)
+ ExpressionConverter
+ .replaceWithExpressionTransformer(child, attributeSeq =
originalInputAttributes)
+ .doTransform(context)
+ case _ =>
+ ExpressionConverter
+ .replaceWithExpressionTransformer(condExpr, attributeSeq =
originalInputAttributes)
+ .doTransform(context)
+ }
+
+ if (!validation) {
+ RelBuilder.makeFilterRel(input, condExprNode, context, operatorId)
+ } else {
+ // Use a extension node to send the input types through Substrait plan
for validation.
+ val inputTypeNodeList = originalInputAttributes
+ .map(attr => ConverterUtils.getTypeNode(attr.dataType, attr.nullable))
+ .asJava
+ val extensionNode = ExtensionBuilder.makeAdvancedExtension(
+ BackendsApiManager.getTransformerApiInstance.packPBMessage(
+ TypeBuilder.makeStruct(false, inputTypeNodeList).toProtobuf))
+ RelBuilder.makeFilterRel(input, condExprNode, extensionNode, context,
operatorId)
+ }
+ }
+
+ override protected def withNewChildInternal(newChild: SparkPlan):
DeltaFilterExecTransformer =
+ copy(child = newChild)
+}
diff --git
a/gluten-delta/src-delta40/main/scala/org/apache/gluten/execution/DeltaProjectExecTransformer.scala
b/gluten-delta/src-delta40/main/scala/org/apache/gluten/execution/DeltaProjectExecTransformer.scala
new file mode 100644
index 0000000000..dc709dad2d
--- /dev/null
+++
b/gluten-delta/src-delta40/main/scala/org/apache/gluten/execution/DeltaProjectExecTransformer.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.expression.{ConverterUtils, ExpressionConverter,
ExpressionTransformer}
+import org.apache.gluten.metrics.MetricsUpdater
+import org.apache.gluten.substrait.`type`.TypeBuilder
+import org.apache.gluten.substrait.SubstraitContext
+import org.apache.gluten.substrait.extensions.ExtensionBuilder
+import org.apache.gluten.substrait.rel.{RelBuilder, RelNode}
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute,
NamedExpression}
+import org.apache.spark.sql.delta.metric.IncrementMetric
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.metric.SQLMetric
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+case class DeltaProjectExecTransformer(projectList: Seq[NamedExpression],
child: SparkPlan)
+ extends ProjectExecTransformerBase(projectList, child) {
+
+ private var extraMetrics = mutable.Seq.empty[(String, SQLMetric)]
+
+ override def metricsUpdater(): MetricsUpdater =
+
BackendsApiManager.getMetricsApiInstance.genProjectTransformerMetricsUpdater(
+ metrics,
+ extraMetrics.toSeq)
+
+ override def getRelNode(
+ context: SubstraitContext,
+ projectList: Seq[NamedExpression],
+ originalInputAttributes: Seq[Attribute],
+ operatorId: Long,
+ input: RelNode,
+ validation: Boolean): RelNode = {
+ val newProjectList = genNewProjectList(projectList)
+ val columnarProjExprs: Seq[ExpressionTransformer] = ExpressionConverter
+ .replaceWithExpressionTransformer(newProjectList, attributeSeq =
originalInputAttributes)
+ val projExprNodeList = columnarProjExprs.map(_.doTransform(context)).asJava
+ val emitStartIndex = originalInputAttributes.size
+ if (!validation) {
+ RelBuilder.makeProjectRel(input, projExprNodeList, context, operatorId,
emitStartIndex)
+ } else {
+ // Use a extension node to send the input types through Substrait plan
for validation.
+ val inputTypeNodeList = originalInputAttributes
+ .map(attr => ConverterUtils.getTypeNode(attr.dataType, attr.nullable))
+ .asJava
+ val extensionNode = ExtensionBuilder.makeAdvancedExtension(
+ BackendsApiManager.getTransformerApiInstance.packPBMessage(
+ TypeBuilder.makeStruct(false, inputTypeNodeList).toProtobuf))
+ RelBuilder.makeProjectRel(
+ input,
+ projExprNodeList,
+ extensionNode,
+ context,
+ operatorId,
+ emitStartIndex)
+ }
+ }
+
+ override protected def withNewChildInternal(newChild: SparkPlan):
DeltaProjectExecTransformer =
+ copy(child = newChild)
+
+ def genNewProjectList(projectList: Seq[NamedExpression]):
Seq[NamedExpression] = {
+ projectList.map {
+ case alias: Alias =>
+ val newChild = alias.child.transformUp {
+ case im @ IncrementMetric(child, metric) =>
+ extraMetrics :+= (im.prettyName, metric)
+ child
+ }
+ Alias(child = newChild, name = alias.name)(alias.exprId)
+ case other => other
+ }
+ }
+}
diff --git
a/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala
b/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala
index 39500a50e7..585c2a85d4 100644
---
a/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala
+++
b/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala
@@ -45,12 +45,12 @@ case class HiveTableScanExecTransformer(
requestedAttributes: Seq[Attribute],
relation: HiveTableRelation,
partitionPruningPred: Seq[Expression],
- prunedOutput: Seq[Attribute] = Seq.empty[Attribute])(@transient session:
SparkSession)
+ prunedOutput: Seq[Attribute] = Seq.empty[Attribute])(@transient
newSession: SparkSession)
extends AbstractHiveTableScanExec(
requestedAttributes,
relation,
partitionPruningPred,
- prunedOutput)(session)
+ prunedOutput)(newSession)
with BasicScanExecTransformer {
@transient override lazy val metrics: Map[String, SQLMetric] =
diff --git a/pom.xml b/pom.xml
index 4a97092e36..a03e040b74 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1163,8 +1163,8 @@
<spark.version>4.0.1</spark.version>
<iceberg.version>1.10.0</iceberg.version>
<delta.package.name>delta-spark</delta.package.name>
- <delta.version>3.3.1</delta.version>
- <delta.binary.version>33</delta.binary.version>
+ <delta.version>4.0.0</delta.version>
+ <delta.binary.version>40</delta.binary.version>
<!-- TODO: This hudi version is invalid for Spark 4.0. -->
<hudi.version>0.15.0</hudi.version>
<fasterxml.version>2.15.1</fasterxml.version>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]