This is an automated email from the ASF dual-hosted git repository.

yma pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 4b6e929ccc fix: push down input_file_name expression to transformer 
scan in delta (#7483)
4b6e929ccc is described below

commit 4b6e929cccd31d0b6312dc4263746583321f022c
Author: Qian Sun <[email protected]>
AuthorDate: Sat Oct 12 14:15:47 2024 +0800

    fix: push down input_file_name expression to transformer scan in delta 
(#7483)
---
 .../extension/DeltaRewriteTransformerRules.scala   | 22 ++++++++++---
 .../apache/gluten/execution/VeloxDeltaSuite.scala  | 38 ++++++++++++++++++++++
 2 files changed, 56 insertions(+), 4 deletions(-)

diff --git 
a/gluten-delta/src/main/scala/org/apache/gluten/extension/DeltaRewriteTransformerRules.scala
 
b/gluten-delta/src/main/scala/org/apache/gluten/extension/DeltaRewriteTransformerRules.scala
index fed837d308..3dc1260321 100644
--- 
a/gluten-delta/src/main/scala/org/apache/gluten/extension/DeltaRewriteTransformerRules.scala
+++ 
b/gluten-delta/src/main/scala/org/apache/gluten/extension/DeltaRewriteTransformerRules.scala
@@ -17,21 +17,21 @@
 package org.apache.gluten.extension
 
 import org.apache.gluten.execution.{DeltaScanTransformer, 
ProjectExecTransformer}
-import 
org.apache.gluten.extension.DeltaRewriteTransformerRules.columnMappingRule
+import 
org.apache.gluten.extension.DeltaRewriteTransformerRules.{columnMappingRule, 
pushDownInputFileExprRule}
 import org.apache.gluten.extension.columnar.RewriteTransformerRules
 
 import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
AttributeReference}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
AttributeReference, Expression, InputFileBlockLength, InputFileBlockStart, 
InputFileName}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.trees.TreeNodeTag
 import org.apache.spark.sql.delta.{DeltaColumnMapping, DeltaParquetFileFormat, 
NoMapping}
-import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.{ProjectExec, SparkPlan}
 import org.apache.spark.sql.execution.datasources.FileFormat
 
 import scala.collection.mutable.ListBuffer
 
 class DeltaRewriteTransformerRules extends RewriteTransformerRules {
-  override def rules: Seq[Rule[SparkPlan]] = columnMappingRule :: Nil
+  override def rules: Seq[Rule[SparkPlan]] = columnMappingRule :: 
pushDownInputFileExprRule :: Nil
 }
 
 object DeltaRewriteTransformerRules {
@@ -58,6 +58,13 @@ object DeltaRewriteTransformerRules {
         transformColumnMappingPlan(p)
     }
 
+  val pushDownInputFileExprRule: Rule[SparkPlan] = (plan: SparkPlan) =>
+    plan.transformUp {
+      case p @ ProjectExec(projectList, child: DeltaScanTransformer)
+          if projectList.exists(containsInputFileRelatedExpr) =>
+        child.copy(output = p.output)
+    }
+
   private def isDeltaColumnMappingFileFormat(fileFormat: FileFormat): Boolean 
= fileFormat match {
     case d: DeltaParquetFileFormat if d.columnMappingMode != NoMapping =>
       true
@@ -65,6 +72,13 @@ object DeltaRewriteTransformerRules {
       false
   }
 
+  private def containsInputFileRelatedExpr(expr: Expression): Boolean = {
+    expr match {
+      case _: InputFileName | _: InputFileBlockStart | _: InputFileBlockLength 
=> true
+      case _ => expr.children.exists(containsInputFileRelatedExpr)
+    }
+  }
+
   /**
    * This method is only used for Delta ColumnMapping FileFormat(e.g. 
nameMapping and idMapping)
    * transform the metadata of Delta into Parquet's, each plan should only be 
transformed once.
diff --git 
a/gluten-delta/src/test/scala/org/apache/gluten/execution/VeloxDeltaSuite.scala 
b/gluten-delta/src/test/scala/org/apache/gluten/execution/VeloxDeltaSuite.scala
index f90bff2afa..70b5ffc688 100644
--- 
a/gluten-delta/src/test/scala/org/apache/gluten/execution/VeloxDeltaSuite.scala
+++ 
b/gluten-delta/src/test/scala/org/apache/gluten/execution/VeloxDeltaSuite.scala
@@ -211,4 +211,42 @@ class VeloxDeltaSuite extends WholeStageTransformerSuite {
         checkAnswer(spark.read.format("delta").load(path), df1)
     }
   }
+
+  testWithSpecifiedSparkVersion("delta: push down input_file_name expression", 
Some("3.2")) {
+    withTable("source_table") {
+      withTable("target_table") {
+        spark.sql(s"""
+                     |CREATE TABLE source_table(id INT, name STRING, age INT) 
USING delta;
+                     |""".stripMargin)
+
+        spark.sql(s"""
+                     |CREATE TABLE target_table(id INT, name STRING, age INT) 
USING delta;
+                     |
+                     |""".stripMargin)
+
+        spark.sql(s"""
+                     |INSERT INTO source_table VALUES(1, 'a', 10),(2, 'b', 20);
+                     |""".stripMargin)
+
+        spark.sql(s"""
+                     |INSERT INTO target_table VALUES(1, 'c', 10),(3, 'c', 30);
+                     |""".stripMargin)
+
+        spark.sql(s"""
+                     |MERGE INTO target_table AS target
+                     |USING source_table AS source
+                     |ON target.id = source.id
+                     |WHEN MATCHED THEN
+                     |UPDATE SET
+                     |  target.name = source.name,
+                     |  target.age = source.age
+                     |WHEN NOT MATCHED THEN
+                     |INSERT (id, name, age) VALUES (source.id, source.name, 
source.age);
+                     |""".stripMargin)
+
+        val df1 = runQueryAndCompare("SELECT * FROM target_table") { _ => }
+        checkAnswer(df1, Row(1, "a", 10) :: Row(2, "b", 20) :: Row(3, "c", 30) 
:: Nil)
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to