This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new e269ba2f9a [VL] Base Delta read support for Spark 4.0 + Delta 4.0 
(#10973)
e269ba2f9a is described below

commit e269ba2f9ad5d1ebb4cf696e090227de625831dc
Author: Hongze Zhang <[email protected]>
AuthorDate: Fri Oct 31 11:31:56 2025 +0000

    [VL] Base Delta read support for Spark 4.0 + Delta 4.0 (#10973)
---
 .../sql/perf/DeltaOptimizedWriterTransformer.scala |  0
 .../execution/DeltaFilterExecTransformer.scala     | 80 +++++++++++++++++++
 .../execution/DeltaProjectExecTransformer.scala    | 92 ++++++++++++++++++++++
 .../sql/hive/HiveTableScanExecTransformer.scala    |  4 +-
 pom.xml                                            |  4 +-
 5 files changed, 176 insertions(+), 4 deletions(-)

diff --git 
a/gluten-delta/src-delta33/main/scala/org/apache/spark/sql/perf/DeltaOptimizedWriterTransformer.scala
 
b/backends-clickhouse/src-delta33/main/scala/org/apache/spark/sql/perf/DeltaOptimizedWriterTransformer.scala
similarity index 100%
rename from 
gluten-delta/src-delta33/main/scala/org/apache/spark/sql/perf/DeltaOptimizedWriterTransformer.scala
rename to 
backends-clickhouse/src-delta33/main/scala/org/apache/spark/sql/perf/DeltaOptimizedWriterTransformer.scala
diff --git 
a/gluten-delta/src-delta40/main/scala/org/apache/gluten/execution/DeltaFilterExecTransformer.scala
 
b/gluten-delta/src-delta40/main/scala/org/apache/gluten/execution/DeltaFilterExecTransformer.scala
new file mode 100644
index 0000000000..b71ff4ca4b
--- /dev/null
+++ 
b/gluten-delta/src-delta40/main/scala/org/apache/gluten/execution/DeltaFilterExecTransformer.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.expression.{ConverterUtils, ExpressionConverter}
+import org.apache.gluten.metrics.MetricsUpdater
+import org.apache.gluten.substrait.`type`.TypeBuilder
+import org.apache.gluten.substrait.SubstraitContext
+import org.apache.gluten.substrait.extensions.ExtensionBuilder
+import org.apache.gluten.substrait.rel.{RelBuilder, RelNode}
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+import org.apache.spark.sql.delta.metric.IncrementMetric
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.metric.SQLMetric
+
+import scala.collection.JavaConverters._
+
+case class DeltaFilterExecTransformer(condition: Expression, child: SparkPlan)
+  extends FilterExecTransformerBase(condition, child) {
+
+  private var extraMetrics: Seq[(String, SQLMetric)] = Seq.empty
+
+  override def metricsUpdater(): MetricsUpdater =
+    
BackendsApiManager.getMetricsApiInstance.genFilterTransformerMetricsUpdater(
+      metrics,
+      extraMetrics.toSeq)
+
+  override def getRelNode(
+      context: SubstraitContext,
+      condExpr: Expression,
+      originalInputAttributes: Seq[Attribute],
+      operatorId: Long,
+      input: RelNode,
+      validation: Boolean): RelNode = {
+    assert(condExpr != null)
+    val condExprNode = condExpr match {
+      case IncrementMetric(child, metric) =>
+        extraMetrics :+= (condExpr.prettyName, metric)
+        ExpressionConverter
+          .replaceWithExpressionTransformer(child, attributeSeq = 
originalInputAttributes)
+          .doTransform(context)
+      case _ =>
+        ExpressionConverter
+          .replaceWithExpressionTransformer(condExpr, attributeSeq = 
originalInputAttributes)
+          .doTransform(context)
+    }
+
+    if (!validation) {
+      RelBuilder.makeFilterRel(input, condExprNode, context, operatorId)
+    } else {
+      // Use a extension node to send the input types through Substrait plan 
for validation.
+      val inputTypeNodeList = originalInputAttributes
+        .map(attr => ConverterUtils.getTypeNode(attr.dataType, attr.nullable))
+        .asJava
+      val extensionNode = ExtensionBuilder.makeAdvancedExtension(
+        BackendsApiManager.getTransformerApiInstance.packPBMessage(
+          TypeBuilder.makeStruct(false, inputTypeNodeList).toProtobuf))
+      RelBuilder.makeFilterRel(input, condExprNode, extensionNode, context, 
operatorId)
+    }
+  }
+
+  override protected def withNewChildInternal(newChild: SparkPlan): 
DeltaFilterExecTransformer =
+    copy(child = newChild)
+}
diff --git 
a/gluten-delta/src-delta40/main/scala/org/apache/gluten/execution/DeltaProjectExecTransformer.scala
 
b/gluten-delta/src-delta40/main/scala/org/apache/gluten/execution/DeltaProjectExecTransformer.scala
new file mode 100644
index 0000000000..dc709dad2d
--- /dev/null
+++ 
b/gluten-delta/src-delta40/main/scala/org/apache/gluten/execution/DeltaProjectExecTransformer.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.expression.{ConverterUtils, ExpressionConverter, 
ExpressionTransformer}
+import org.apache.gluten.metrics.MetricsUpdater
+import org.apache.gluten.substrait.`type`.TypeBuilder
+import org.apache.gluten.substrait.SubstraitContext
+import org.apache.gluten.substrait.extensions.ExtensionBuilder
+import org.apache.gluten.substrait.rel.{RelBuilder, RelNode}
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
NamedExpression}
+import org.apache.spark.sql.delta.metric.IncrementMetric
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.metric.SQLMetric
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+case class DeltaProjectExecTransformer(projectList: Seq[NamedExpression], 
child: SparkPlan)
+  extends ProjectExecTransformerBase(projectList, child) {
+
+  private var extraMetrics = mutable.Seq.empty[(String, SQLMetric)]
+
+  override def metricsUpdater(): MetricsUpdater =
+    
BackendsApiManager.getMetricsApiInstance.genProjectTransformerMetricsUpdater(
+      metrics,
+      extraMetrics.toSeq)
+
+  override def getRelNode(
+      context: SubstraitContext,
+      projectList: Seq[NamedExpression],
+      originalInputAttributes: Seq[Attribute],
+      operatorId: Long,
+      input: RelNode,
+      validation: Boolean): RelNode = {
+    val newProjectList = genNewProjectList(projectList)
+    val columnarProjExprs: Seq[ExpressionTransformer] = ExpressionConverter
+      .replaceWithExpressionTransformer(newProjectList, attributeSeq = 
originalInputAttributes)
+    val projExprNodeList = columnarProjExprs.map(_.doTransform(context)).asJava
+    val emitStartIndex = originalInputAttributes.size
+    if (!validation) {
+      RelBuilder.makeProjectRel(input, projExprNodeList, context, operatorId, 
emitStartIndex)
+    } else {
+      // Use a extension node to send the input types through Substrait plan 
for validation.
+      val inputTypeNodeList = originalInputAttributes
+        .map(attr => ConverterUtils.getTypeNode(attr.dataType, attr.nullable))
+        .asJava
+      val extensionNode = ExtensionBuilder.makeAdvancedExtension(
+        BackendsApiManager.getTransformerApiInstance.packPBMessage(
+          TypeBuilder.makeStruct(false, inputTypeNodeList).toProtobuf))
+      RelBuilder.makeProjectRel(
+        input,
+        projExprNodeList,
+        extensionNode,
+        context,
+        operatorId,
+        emitStartIndex)
+    }
+  }
+
+  override protected def withNewChildInternal(newChild: SparkPlan): 
DeltaProjectExecTransformer =
+    copy(child = newChild)
+
+  def genNewProjectList(projectList: Seq[NamedExpression]): 
Seq[NamedExpression] = {
+    projectList.map {
+      case alias: Alias =>
+        val newChild = alias.child.transformUp {
+          case im @ IncrementMetric(child, metric) =>
+            extraMetrics :+= (im.prettyName, metric)
+            child
+        }
+        Alias(child = newChild, name = alias.name)(alias.exprId)
+      case other => other
+    }
+  }
+}
diff --git 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala
 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala
index 39500a50e7..585c2a85d4 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala
@@ -45,12 +45,12 @@ case class HiveTableScanExecTransformer(
     requestedAttributes: Seq[Attribute],
     relation: HiveTableRelation,
     partitionPruningPred: Seq[Expression],
-    prunedOutput: Seq[Attribute] = Seq.empty[Attribute])(@transient session: 
SparkSession)
+    prunedOutput: Seq[Attribute] = Seq.empty[Attribute])(@transient 
newSession: SparkSession)
   extends AbstractHiveTableScanExec(
     requestedAttributes,
     relation,
     partitionPruningPred,
-    prunedOutput)(session)
+    prunedOutput)(newSession)
   with BasicScanExecTransformer {
 
   @transient override lazy val metrics: Map[String, SQLMetric] =
diff --git a/pom.xml b/pom.xml
index 4a97092e36..a03e040b74 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1163,8 +1163,8 @@
         <spark.version>4.0.1</spark.version>
         <iceberg.version>1.10.0</iceberg.version>
         <delta.package.name>delta-spark</delta.package.name>
-        <delta.version>3.3.1</delta.version>
-        <delta.binary.version>33</delta.binary.version>
+        <delta.version>4.0.0</delta.version>
+        <delta.binary.version>40</delta.binary.version>
         <!-- TODO: This hudi version is invalid for Spark 4.0. -->
         <hudi.version>0.15.0</hudi.version>
         <fasterxml.version>2.15.1</fasterxml.version>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to