This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 8078f249b [VL] Fix function `input_file_name()` outputs empty string 
in certain query plan patterns (#7124)
8078f249b is described below

commit 8078f249b58f52516ccb34dc407857fe9cd293bd
Author: Mingliang Zhu <[email protected]>
AuthorDate: Fri Sep 6 16:57:38 2024 +0800

    [VL] Fix function `input_file_name()` outputs empty string in certain query 
plan patterns (#7124)
---
 .../gluten/backendsapi/velox/VeloxBackend.scala    |   2 -
 .../gluten/backendsapi/velox/VeloxRuleApi.scala    |   2 +
 .../execution/ScalarFunctionsValidateSuite.scala   |  17 ++-
 .../gluten/backendsapi/BackendSettingsApi.scala    |   1 -
 .../extension/columnar/MiscColumnarRules.scala     |   2 +-
 .../extension/columnar/OffloadSingleNode.scala     | 152 +--------------------
 .../columnar/PushDownInputFileExpression.scala     | 118 ++++++++++++++++
 7 files changed, 142 insertions(+), 152 deletions(-)

diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
index 611e9c15b..ecd053a63 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
@@ -282,8 +282,6 @@ object VeloxBackendSettings extends BackendSettingsApi {
 
   override def supportNativeRowIndexColumn(): Boolean = true
 
-  override def supportNativeInputFileRelatedExpr(): Boolean = true
-
   override def supportExpandExec(): Boolean = true
 
   override def supportSortExec(): Boolean = true
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
index 4ff7f0305..b278224b2 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
@@ -52,6 +52,7 @@ private object VeloxRuleApi {
   def injectLegacy(injector: LegacyInjector): Unit = {
     // Gluten columnar: Transform rules.
     injector.injectTransform(_ => RemoveTransitions)
+    injector.injectTransform(_ => PushDownInputFileExpression.PreOffload)
     injector.injectTransform(c => FallbackOnANSIMode.apply(c.session))
     injector.injectTransform(c => FallbackMultiCodegens.apply(c.session))
     injector.injectTransform(c => PlanOneRowRelation.apply(c.session))
@@ -64,6 +65,7 @@ private object VeloxRuleApi {
     injector.injectTransform(_ => TransformPreOverrides())
     injector.injectTransform(_ => RemoveNativeWriteFilesSortAndProject())
     injector.injectTransform(c => RewriteTransformer.apply(c.session))
+    injector.injectTransform(_ => PushDownInputFileExpression.PostOffload)
     injector.injectTransform(_ => EnsureLocalSortRequirements)
     injector.injectTransform(_ => EliminateLocalSort)
     injector.injectTransform(_ => CollapseProjectExecTransformer)
diff --git 
a/backends-velox/src/test/scala/org/apache/gluten/execution/ScalarFunctionsValidateSuite.scala
 
b/backends-velox/src/test/scala/org/apache/gluten/execution/ScalarFunctionsValidateSuite.scala
index 81da24f8e..a376fd488 100644
--- 
a/backends-velox/src/test/scala/org/apache/gluten/execution/ScalarFunctionsValidateSuite.scala
+++ 
b/backends-velox/src/test/scala/org/apache/gluten/execution/ScalarFunctionsValidateSuite.scala
@@ -28,7 +28,7 @@ class ScalarFunctionsValidateSuiteRasOff extends 
ScalarFunctionsValidateSuite {
     super.sparkConf
       .set("spark.gluten.ras.enabled", "false")
   }
-
+  import testImplicits._
   // Since https://github.com/apache/incubator-gluten/pull/6200.
   test("Test input_file_name function") {
     runQueryAndCompare("""SELECT input_file_name(), l_orderkey
@@ -44,6 +44,21 @@ class ScalarFunctionsValidateSuiteRasOff extends 
ScalarFunctionsValidateSuite {
                          | limit 100""".stripMargin) {
       checkGlutenOperatorMatch[ProjectExecTransformer]
     }
+    withTempPath {
+      path =>
+        Seq(1, 2, 3).toDF("a").write.json(path.getCanonicalPath)
+        
spark.read.json(path.getCanonicalPath).createOrReplaceTempView("json_table")
+        val sql =
+          """
+            |SELECT input_file_name(), a
+            |FROM
+            |(SELECT a FROM json_table
+            |UNION ALL
+            |SELECT l_orderkey as a FROM lineitem)
+            |LIMIT 100
+            |""".stripMargin
+        compareResultsAgainstVanillaSpark(sql, true, { _ => })
+    }
   }
 }
 
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
index c9205bae9..451cb2fd2 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
@@ -45,7 +45,6 @@ trait BackendSettingsApi {
   def supportNativeWrite(fields: Array[StructField]): Boolean = true
   def supportNativeMetadataColumns(): Boolean = false
   def supportNativeRowIndexColumn(): Boolean = false
-  def supportNativeInputFileRelatedExpr(): Boolean = false
 
   def supportExpandExec(): Boolean = false
   def supportSortExec(): Boolean = false
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/MiscColumnarRules.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/MiscColumnarRules.scala
index b7a30f7e1..4e668d7f8 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/MiscColumnarRules.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/MiscColumnarRules.scala
@@ -34,7 +34,7 @@ object MiscColumnarRules {
   object TransformPreOverrides {
     def apply(): TransformPreOverrides = {
       TransformPreOverrides(
-        List(OffloadProject(), OffloadFilter()),
+        List(OffloadFilter()),
         List(
           OffloadOthers(),
           OffloadAggregate(),
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala
index 3cea5e76a..6047789e6 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala
@@ -26,21 +26,17 @@ import org.apache.gluten.utils.{LogLevelUtil, PlanUtil}
 
 import org.apache.spark.api.python.EvalPythonExecTransformer
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, Expression, InputFileBlockLength, InputFileBlockStart, 
InputFileName, NamedExpression}
 import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, 
BuildSide}
 import org.apache.spark.sql.catalyst.plans.logical.Join
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, 
ObjectHashAggregateExec, SortAggregateExec}
 import org.apache.spark.sql.execution.datasources.WriteFilesExec
-import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, 
DataSourceV2ScanExecBase}
+import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
 import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, 
ShuffleExchangeExec}
 import org.apache.spark.sql.execution.joins._
 import org.apache.spark.sql.execution.python.{ArrowEvalPythonExec, 
BatchEvalPythonExec}
 import org.apache.spark.sql.execution.window.{WindowExec, 
WindowGroupLimitExecShim}
 import org.apache.spark.sql.hive.HiveTableScanExecTransformer
-import org.apache.spark.sql.types.{LongType, StringType}
-
-import scala.collection.mutable.Map
 
 /**
  * Converts a vanilla Spark plan node into Gluten plan node. Gluten plan is 
supposed to be executed
@@ -224,148 +220,6 @@ object OffloadJoin {
   }
 }
 
-case class OffloadProject() extends OffloadSingleNode with LogLevelUtil {
-  private def containsInputFileRelatedExpr(expr: Expression): Boolean = {
-    expr match {
-      case _: InputFileName | _: InputFileBlockStart | _: InputFileBlockLength 
=> true
-      case _ => expr.children.exists(containsInputFileRelatedExpr)
-    }
-  }
-
-  private def rewriteExpr(
-      expr: Expression,
-      replacedExprs: Map[String, AttributeReference]): Expression = {
-    expr match {
-      case _: InputFileName =>
-        replacedExprs.getOrElseUpdate(
-          expr.prettyName,
-          AttributeReference(expr.prettyName, StringType, false)())
-      case _: InputFileBlockStart =>
-        replacedExprs.getOrElseUpdate(
-          expr.prettyName,
-          AttributeReference(expr.prettyName, LongType, false)())
-      case _: InputFileBlockLength =>
-        replacedExprs.getOrElseUpdate(
-          expr.prettyName,
-          AttributeReference(expr.prettyName, LongType, false)())
-      case other =>
-        other.withNewChildren(other.children.map(child => rewriteExpr(child, 
replacedExprs)))
-    }
-  }
-
-  private def addMetadataCol(
-      plan: SparkPlan,
-      replacedExprs: Map[String, AttributeReference]): SparkPlan = {
-    def genNewOutput(output: Seq[Attribute]): Seq[Attribute] = {
-      var newOutput = output
-      for ((_, newAttr) <- replacedExprs) {
-        if (!newOutput.exists(attr => attr.exprId == newAttr.exprId)) {
-          newOutput = newOutput :+ newAttr
-        }
-      }
-      newOutput
-    }
-    def genNewProjectList(projectList: Seq[NamedExpression]): 
Seq[NamedExpression] = {
-      var newProjectList = projectList
-      for ((_, newAttr) <- replacedExprs) {
-        if (!newProjectList.exists(attr => attr.exprId == newAttr.exprId)) {
-          newProjectList = newProjectList :+ newAttr.toAttribute
-        }
-      }
-      newProjectList
-    }
-
-    plan match {
-      case f: FileSourceScanExec =>
-        f.copy(output = genNewOutput(f.output))
-      case f: FileSourceScanExecTransformer =>
-        f.copy(output = genNewOutput(f.output))
-      case b: BatchScanExec =>
-        b.copy(output = 
genNewOutput(b.output).asInstanceOf[Seq[AttributeReference]])
-      case b: BatchScanExecTransformer =>
-        b.copy(output = 
genNewOutput(b.output).asInstanceOf[Seq[AttributeReference]])
-      case p @ ProjectExec(projectList, child) =>
-        p.copy(genNewProjectList(projectList), addMetadataCol(child, 
replacedExprs))
-      case p @ ProjectExecTransformer(projectList, child) =>
-        p.copy(genNewProjectList(projectList), addMetadataCol(child, 
replacedExprs))
-      case u @ UnionExec(children) =>
-        val newFirstChild = addMetadataCol(children.head, replacedExprs)
-        val newOtherChildren = children.tail.map {
-          child =>
-            // Make sure exprId is unique in each child of Union.
-            val newReplacedExprs = replacedExprs.map {
-              expr => (expr._1, AttributeReference(expr._2.name, 
expr._2.dataType, false)())
-            }
-            addMetadataCol(child, newReplacedExprs)
-        }
-        u.copy(children = newFirstChild +: newOtherChildren)
-      case _ => plan.withNewChildren(plan.children.map(addMetadataCol(_, 
replacedExprs)))
-    }
-  }
-
-  private def tryOffloadProjectExecWithInputFileRelatedExprs(
-      projectExec: ProjectExec): SparkPlan = {
-    def findScanNodes(plan: SparkPlan): Seq[SparkPlan] = {
-      plan.collect {
-        case f @ (_: FileSourceScanExec | _: AbstractFileSourceScanExec |
-            _: DataSourceV2ScanExecBase) =>
-          f
-      }
-    }
-    val addHint = AddFallbackTagRule()
-    val newProjectList = 
projectExec.projectList.filterNot(containsInputFileRelatedExpr)
-    val newProjectExec = ProjectExec(newProjectList, projectExec.child)
-    addHint.apply(newProjectExec)
-    if (FallbackTags.nonEmpty(newProjectExec)) {
-      // Project is still not transformable after remove `input_file_name` 
expressions.
-      projectExec
-    } else {
-      // the project with `input_file_name` expression may have multiple data 
source
-      // by union all, reference:
-      // 
https://github.com/apache/spark/blob/e459674127e7b21e2767cc62d10ea6f1f941936c
-      // 
/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala#L519
-      val leafScans = findScanNodes(projectExec)
-      if (leafScans.isEmpty || leafScans.exists(FallbackTags.nonEmpty)) {
-        // It means
-        // 1. projectExec has `input_file_name` but no scan child.
-        // 2. It has scan children node but the scan node fallback.
-        projectExec
-      } else {
-        val replacedExprs = scala.collection.mutable.Map[String, 
AttributeReference]()
-        val newProjectList = projectExec.projectList.map {
-          expr => rewriteExpr(expr, 
replacedExprs).asInstanceOf[NamedExpression]
-        }
-        val newChild = addMetadataCol(projectExec.child, replacedExprs)
-        logDebug(
-          s"Columnar Processing for ${projectExec.getClass} with " +
-            s"ProjectList ${projectExec.projectList} is currently supported.")
-        ProjectExecTransformer(newProjectList, newChild)
-      }
-    }
-  }
-
-  private def genProjectExec(projectExec: ProjectExec): SparkPlan = {
-    if (
-      FallbackTags.nonEmpty(projectExec) &&
-      BackendsApiManager.getSettings.supportNativeInputFileRelatedExpr() &&
-      projectExec.projectList.exists(containsInputFileRelatedExpr)
-    ) {
-      tryOffloadProjectExecWithInputFileRelatedExprs(projectExec)
-    } else if (FallbackTags.nonEmpty(projectExec)) {
-      projectExec
-    } else {
-      logDebug(s"Columnar Processing for ${projectExec.getClass} is currently 
supported.")
-      ProjectExecTransformer(projectExec.projectList, projectExec.child)
-    }
-  }
-
-  override def offload(plan: SparkPlan): SparkPlan = plan match {
-    case p: ProjectExec =>
-      genProjectExec(p)
-    case other => other
-  }
-}
-
 // Filter transformation.
 case class OffloadFilter() extends OffloadSingleNode with LogLevelUtil {
   import OffloadOthers._
@@ -443,6 +297,10 @@ object OffloadOthers {
         case plan: CoalesceExec =>
           logDebug(s"Columnar Processing for ${plan.getClass} is currently 
supported.")
           ColumnarCoalesceExec(plan.numPartitions, plan.child)
+        case plan: ProjectExec =>
+          val columnarChild = plan.child
+          logDebug(s"Columnar Processing for ${plan.getClass} is currently 
supported.")
+          ProjectExecTransformer(plan.projectList, columnarChild)
         case plan: SortAggregateExec =>
           logDebug(s"Columnar Processing for ${plan.getClass} is currently 
supported.")
           HashAggregateExecBaseTransformer.from(plan)
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/PushDownInputFileExpression.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/PushDownInputFileExpression.scala
new file mode 100644
index 000000000..e1219fead
--- /dev/null
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/PushDownInputFileExpression.scala
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.extension.columnar
+
+import org.apache.gluten.execution.{BatchScanExecTransformer, 
FileSourceScanExecTransformer}
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, 
Expression, InputFileBlockLength, InputFileBlockStart, InputFileName, 
NamedExpression}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{DeserializeToObjectExec, LeafExecNode, 
ProjectExec, SerializeFromObjectExec, SparkPlan, UnionExec}
+
+import scala.collection.mutable
+
+/**
+ * The Spark implementations of 
input_file_name/input_file_block_start/input_file_block_length uses
+ * a thread local to stash the file name and retrieve it from the function. If 
there is a
+ * transformer node between project input_file_function and scan, the result 
of input_file_name is
+ * an empty string. So we should push down input_file_function to transformer 
scan or add fallback
+ * project of input_file_function before fallback scan.
+ *
+ * Two rules are involved:
+ *   - Before offload, add new project before leaf node and push down input 
file expression to the
+ *     new project
+ *   - After offload, if scan be offloaded, push down input file expression 
into scan and remove
+ *     project
+ */
+object PushDownInputFileExpression {
+  def containsInputFileRelatedExpr(expr: Expression): Boolean = {
+    expr match {
+      case _: InputFileName | _: InputFileBlockStart | _: InputFileBlockLength 
=> true
+      case _ => expr.children.exists(containsInputFileRelatedExpr)
+    }
+  }
+
+  object PreOffload extends Rule[SparkPlan] {
+    override def apply(plan: SparkPlan): SparkPlan = plan.transformUp {
+      case ProjectExec(projectList, child) if 
projectList.exists(containsInputFileRelatedExpr) =>
+        val replacedExprs = mutable.Map[String, Alias]()
+        val newProjectList = projectList.map {
+          expr => rewriteExpr(expr, 
replacedExprs).asInstanceOf[NamedExpression]
+        }
+        val newChild = addMetadataCol(child, replacedExprs)
+        ProjectExec(newProjectList, newChild)
+    }
+
+    private def rewriteExpr(
+        expr: Expression,
+        replacedExprs: mutable.Map[String, Alias]): Expression =
+      expr match {
+        case _: InputFileName =>
+          replacedExprs
+            .getOrElseUpdate(expr.prettyName, Alias(InputFileName(), 
expr.prettyName)())
+            .toAttribute
+        case _: InputFileBlockStart =>
+          replacedExprs
+            .getOrElseUpdate(expr.prettyName, Alias(InputFileBlockStart(), 
expr.prettyName)())
+            .toAttribute
+        case _: InputFileBlockLength =>
+          replacedExprs
+            .getOrElseUpdate(expr.prettyName, Alias(InputFileBlockLength(), 
expr.prettyName)())
+            .toAttribute
+        case other =>
+          other.withNewChildren(other.children.map(child => rewriteExpr(child, 
replacedExprs)))
+      }
+
+    private def addMetadataCol(
+        plan: SparkPlan,
+        replacedExprs: mutable.Map[String, Alias]): SparkPlan =
+      plan match {
+        case p: LeafExecNode =>
+          ProjectExec(p.output ++ replacedExprs.values, p)
+        // Output of SerializeFromObjectExec's child and output of 
DeserializeToObjectExec must be
+        // a single-field row.
+        case p @ (_: SerializeFromObjectExec | _: DeserializeToObjectExec) =>
+          ProjectExec(p.output ++ replacedExprs.values, p)
+        case p: ProjectExec =>
+          p.copy(
+            projectList = p.projectList ++ 
replacedExprs.values.toSeq.map(_.toAttribute),
+            child = addMetadataCol(p.child, replacedExprs))
+        case u @ UnionExec(children) =>
+          val newFirstChild = addMetadataCol(children.head, replacedExprs)
+          val newOtherChildren = children.tail.map {
+            child =>
+              // Make sure exprId is unique in each child of Union.
+              val newReplacedExprs = replacedExprs.map {
+                expr => (expr._1, Alias(expr._2.child, expr._2.name)())
+              }
+              addMetadataCol(child, newReplacedExprs)
+          }
+          u.copy(children = newFirstChild +: newOtherChildren)
+        case p => p.withNewChildren(p.children.map(child => 
addMetadataCol(child, replacedExprs)))
+      }
+  }
+
+  object PostOffload extends Rule[SparkPlan] {
+    override def apply(plan: SparkPlan): SparkPlan = plan.transformUp {
+      case p @ ProjectExec(projectList, child: FileSourceScanExecTransformer)
+          if projectList.exists(containsInputFileRelatedExpr) =>
+        child.copy(output = p.output)
+      case p @ ProjectExec(projectList, child: BatchScanExecTransformer)
+          if projectList.exists(containsInputFileRelatedExpr) =>
+        child.copy(output = p.output.asInstanceOf[Seq[AttributeReference]])
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to