This is an automated email from the ASF dual-hosted git repository.
yma pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 4b6e929ccc fix: push down input_file_name expression to transformer
scan in delta (#7483)
4b6e929ccc is described below
commit 4b6e929cccd31d0b6312dc4263746583321f022c
Author: Qian Sun <[email protected]>
AuthorDate: Sat Oct 12 14:15:47 2024 +0800
fix: push down input_file_name expression to transformer scan in delta
(#7483)
---
.../extension/DeltaRewriteTransformerRules.scala | 22 ++++++++++---
.../apache/gluten/execution/VeloxDeltaSuite.scala | 38 ++++++++++++++++++++++
2 files changed, 56 insertions(+), 4 deletions(-)
diff --git
a/gluten-delta/src/main/scala/org/apache/gluten/extension/DeltaRewriteTransformerRules.scala
b/gluten-delta/src/main/scala/org/apache/gluten/extension/DeltaRewriteTransformerRules.scala
index fed837d308..3dc1260321 100644
---
a/gluten-delta/src/main/scala/org/apache/gluten/extension/DeltaRewriteTransformerRules.scala
+++
b/gluten-delta/src/main/scala/org/apache/gluten/extension/DeltaRewriteTransformerRules.scala
@@ -17,21 +17,21 @@
package org.apache.gluten.extension
import org.apache.gluten.execution.{DeltaScanTransformer,
ProjectExecTransformer}
-import
org.apache.gluten.extension.DeltaRewriteTransformerRules.columnMappingRule
+import
org.apache.gluten.extension.DeltaRewriteTransformerRules.{columnMappingRule,
pushDownInputFileExprRule}
import org.apache.gluten.extension.columnar.RewriteTransformerRules
import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute,
AttributeReference}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute,
AttributeReference, Expression, InputFileBlockLength, InputFileBlockStart,
InputFileName}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.TreeNodeTag
import org.apache.spark.sql.delta.{DeltaColumnMapping, DeltaParquetFileFormat,
NoMapping}
-import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.{ProjectExec, SparkPlan}
import org.apache.spark.sql.execution.datasources.FileFormat
import scala.collection.mutable.ListBuffer
class DeltaRewriteTransformerRules extends RewriteTransformerRules {
- override def rules: Seq[Rule[SparkPlan]] = columnMappingRule :: Nil
+ override def rules: Seq[Rule[SparkPlan]] = columnMappingRule ::
pushDownInputFileExprRule :: Nil
}
object DeltaRewriteTransformerRules {
@@ -58,6 +58,13 @@ object DeltaRewriteTransformerRules {
transformColumnMappingPlan(p)
}
+ val pushDownInputFileExprRule: Rule[SparkPlan] = (plan: SparkPlan) =>
+ plan.transformUp {
+ case p @ ProjectExec(projectList, child: DeltaScanTransformer)
+ if projectList.exists(containsInputFileRelatedExpr) =>
+ child.copy(output = p.output)
+ }
+
private def isDeltaColumnMappingFileFormat(fileFormat: FileFormat): Boolean
= fileFormat match {
case d: DeltaParquetFileFormat if d.columnMappingMode != NoMapping =>
true
@@ -65,6 +72,13 @@ object DeltaRewriteTransformerRules {
false
}
+ private def containsInputFileRelatedExpr(expr: Expression): Boolean = {
+ expr match {
+ case _: InputFileName | _: InputFileBlockStart | _: InputFileBlockLength
=> true
+ case _ => expr.children.exists(containsInputFileRelatedExpr)
+ }
+ }
+
/**
* This method is only used for Delta ColumnMapping FileFormat(e.g.
nameMapping and idMapping)
* transform the metadata of Delta into Parquet's, each plan should only be
transformed once.
diff --git
a/gluten-delta/src/test/scala/org/apache/gluten/execution/VeloxDeltaSuite.scala
b/gluten-delta/src/test/scala/org/apache/gluten/execution/VeloxDeltaSuite.scala
index f90bff2afa..70b5ffc688 100644
---
a/gluten-delta/src/test/scala/org/apache/gluten/execution/VeloxDeltaSuite.scala
+++
b/gluten-delta/src/test/scala/org/apache/gluten/execution/VeloxDeltaSuite.scala
@@ -211,4 +211,42 @@ class VeloxDeltaSuite extends WholeStageTransformerSuite {
checkAnswer(spark.read.format("delta").load(path), df1)
}
}
+
+ testWithSpecifiedSparkVersion("delta: push down input_file_name expression",
Some("3.2")) {
+ withTable("source_table") {
+ withTable("target_table") {
+ spark.sql(s"""
+ |CREATE TABLE source_table(id INT, name STRING, age INT)
USING delta;
+ |""".stripMargin)
+
+ spark.sql(s"""
+ |CREATE TABLE target_table(id INT, name STRING, age INT)
USING delta;
+ |
+ |""".stripMargin)
+
+ spark.sql(s"""
+ |INSERT INTO source_table VALUES(1, 'a', 10),(2, 'b', 20);
+ |""".stripMargin)
+
+ spark.sql(s"""
+ |INSERT INTO target_table VALUES(1, 'c', 10),(3, 'c', 30);
+ |""".stripMargin)
+
+ spark.sql(s"""
+ |MERGE INTO target_table AS target
+ |USING source_table AS source
+ |ON target.id = source.id
+ |WHEN MATCHED THEN
+ |UPDATE SET
+ | target.name = source.name,
+ | target.age = source.age
+ |WHEN NOT MATCHED THEN
+ |INSERT (id, name, age) VALUES (source.id, source.name,
source.age);
+ |""".stripMargin)
+
+ val df1 = runQueryAndCompare("SELECT * FROM target_table") { _ => }
+ checkAnswer(df1, Row(1, "a", 10) :: Row(2, "b", 20) :: Row(3, "c", 30)
:: Nil)
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]