This is an automated email from the ASF dual-hosted git repository.

taiyangli pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 4ed9e2e57a [GLUTEN-9055][CH] Fix input_file_name diff from hive text 
table (#9142)
4ed9e2e57a is described below

commit 4ed9e2e57a5613e3d3eecd77e3f203795a89b1fb
Author: 李扬 <[email protected]>
AuthorDate: Mon Mar 31 14:53:28 2025 +0800

    [GLUTEN-9055][CH] Fix input_file_name diff from hive text table (#9142)
    
    * fix input_file_name diff
    
    * add uts
    
    * fix failed uts
    
    * fix failed uts
---
 .../hive/GlutenClickHouseHiveTableSuite.scala      | 58 ++++++++++++++++++++++
 .../columnar/PushDownInputFileExpression.scala     |  9 ++++
 .../hive/execution/AbstractHiveTableScanExec.scala |  2 +-
 .../hive/execution/AbstractHiveTableScanExec.scala |  2 +-
 .../hive/execution/AbstractHiveTableScanExec.scala |  2 +-
 .../hive/execution/AbstractHiveTableScanExec.scala |  2 +-
 6 files changed, 71 insertions(+), 4 deletions(-)

diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseHiveTableSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseHiveTableSuite.scala
index a64ac191a9..4d8b66b4c5 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseHiveTableSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseHiveTableSuite.scala
@@ -1644,4 +1644,62 @@ class GlutenClickHouseHiveTableSuite
     }
   }
 
+  test("test input_file_name() in different formats") {
+    val formats = Seq("textfile", "orc", "parquet")
+    val tableNamePrefix = "sales_"
+
+    formats.foreach {
+      format =>
+        val tableName = s"$tableNamePrefix${format.take(2)}"
+        val createTableSql =
+          s"""
+             |CREATE TABLE $tableName (
+             |  product_id STRING,
+             |  quantity INT
+             |) PARTITIONED BY (year STRING)
+             |STORED AS $format
+             |""".stripMargin
+
+        val insertDataSql1 =
+          s"""
+             |INSERT INTO $tableName PARTITION(year='2001')
+             |SELECT 'prod1', 100
+             |""".stripMargin
+
+        val insertDataSql2 =
+          s"""
+             |INSERT INTO $tableName PARTITION(year='2002')
+             |SELECT 'prod1', 200
+             |""".stripMargin
+
+        val select1Sql = s"SELECT input_file_name() from $tableName"
+        val select2Sql = s"SELECT input_file_block_start(), " +
+          s"input_file_block_length() FROM $tableName"
+        s"input_file_block_length() FROM $tableName"
+        val dropSql = s"DROP TABLE IF EXISTS $tableName"
+
+        spark.sql(createTableSql)
+        spark.sql(insertDataSql1)
+        spark.sql(insertDataSql2)
+
+        if (format.equals("textfile")) {
+          // When format is textfile, input_file_name() in vanilla returns 
paths like 'file:/xxx'
+          // But in gluten it returns paths like 'file:///xxx'.
+          val result = spark.sql(select1Sql)
+          result
+            .collect()
+            .foreach(
+              row => {
+                assert(!row.isNullAt(0) && row.getString(0).nonEmpty)
+              })
+        } else {
+          compareResultsAgainstVanillaSpark(select1Sql, compareResult = true, 
_ => {})
+        }
+
+        compareResultsAgainstVanillaSpark(select2Sql, compareResult = true, _ 
=> {})
+
+        spark.sql(dropSql)
+    }
+  }
+
 }
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/PushDownInputFileExpression.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/PushDownInputFileExpression.scala
index 778bd62b6d..b419bd08f3 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/PushDownInputFileExpression.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/PushDownInputFileExpression.scala
@@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, 
Attribute, AttributeRef
 import org.apache.spark.sql.catalyst.optimizer.CollapseProjectShim
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.{DeserializeToObjectExec, LeafExecNode, 
ProjectExec, SerializeFromObjectExec, SparkPlan, UnionExec}
+import org.apache.spark.sql.hive.HiveTableScanExecTransformer
 
 import scala.collection.mutable
 
@@ -116,6 +117,14 @@ object PushDownInputFileExpression {
       case p @ ProjectExec(projectList, child: FileSourceScanExecTransformer)
           if projectList.exists(containsInputFileRelatedExpr) =>
         child.copy(output = p.output)
+      case p @ ProjectExec(projectList, child: HiveTableScanExecTransformer)
+          if projectList.exists(containsInputFileRelatedExpr) =>
+        child.copy(
+          requestedAttributes = p.output,
+          relation = child.relation,
+          partitionPruningPred = child.partitionPruningPred,
+          prunedOutput = child.prunedOutput
+        )(child.session)
       case p @ ProjectExec(projectList, child: BatchScanExecTransformer)
           if projectList.exists(containsInputFileRelatedExpr) =>
         child.copy(output = p.output.asInstanceOf[Seq[AttributeReference]])
diff --git 
a/shims/spark32/src/main/scala/org/apache/spark/sql/hive/execution/AbstractHiveTableScanExec.scala
 
b/shims/spark32/src/main/scala/org/apache/spark/sql/hive/execution/AbstractHiveTableScanExec.scala
index f38c85a49d..e1ab055e18 100644
--- 
a/shims/spark32/src/main/scala/org/apache/spark/sql/hive/execution/AbstractHiveTableScanExec.scala
+++ 
b/shims/spark32/src/main/scala/org/apache/spark/sql/hive/execution/AbstractHiveTableScanExec.scala
@@ -76,7 +76,7 @@ abstract private[hive] class AbstractHiveTableScanExec(
 
   override val output: Seq[Attribute] = {
     // Retrieve the original attributes based on expression ID so that 
capitalization matches.
-    requestedAttributes.map(originalAttributes).distinct
+    requestedAttributes.map(attr => originalAttributes.getOrElse(attr, 
attr)).distinct
   }
 
   // Bind all partition key attribute references in the partition pruning 
predicate for later
diff --git 
a/shims/spark33/src/main/scala/org/apache/spark/sql/hive/execution/AbstractHiveTableScanExec.scala
 
b/shims/spark33/src/main/scala/org/apache/spark/sql/hive/execution/AbstractHiveTableScanExec.scala
index d9b6bb936f..52104f3eea 100644
--- 
a/shims/spark33/src/main/scala/org/apache/spark/sql/hive/execution/AbstractHiveTableScanExec.scala
+++ 
b/shims/spark33/src/main/scala/org/apache/spark/sql/hive/execution/AbstractHiveTableScanExec.scala
@@ -76,7 +76,7 @@ abstract private[hive] class AbstractHiveTableScanExec(
 
   override val output: Seq[Attribute] = {
     // Retrieve the original attributes based on expression ID so that 
capitalization matches.
-    requestedAttributes.map(originalAttributes).distinct
+    requestedAttributes.map(attr => originalAttributes.getOrElse(attr, 
attr)).distinct
   }
 
   // Bind all partition key attribute references in the partition pruning 
predicate for later
diff --git 
a/shims/spark34/src/main/scala/org/apache/spark/sql/hive/execution/AbstractHiveTableScanExec.scala
 
b/shims/spark34/src/main/scala/org/apache/spark/sql/hive/execution/AbstractHiveTableScanExec.scala
index 3521d49654..48f5bb3cbd 100644
--- 
a/shims/spark34/src/main/scala/org/apache/spark/sql/hive/execution/AbstractHiveTableScanExec.scala
+++ 
b/shims/spark34/src/main/scala/org/apache/spark/sql/hive/execution/AbstractHiveTableScanExec.scala
@@ -78,7 +78,7 @@ abstract private[hive] class AbstractHiveTableScanExec(
 
   override val output: Seq[Attribute] = {
     // Retrieve the original attributes based on expression ID so that 
capitalization matches.
-    requestedAttributes.map(originalAttributes).distinct
+    requestedAttributes.map(attr => originalAttributes.getOrElse(attr, 
attr)).distinct
   }
 
   // Bind all partition key attribute references in the partition pruning 
predicate for later
diff --git 
a/shims/spark35/src/main/scala/org/apache/spark/sql/hive/execution/AbstractHiveTableScanExec.scala
 
b/shims/spark35/src/main/scala/org/apache/spark/sql/hive/execution/AbstractHiveTableScanExec.scala
index 3521d49654..48f5bb3cbd 100644
--- 
a/shims/spark35/src/main/scala/org/apache/spark/sql/hive/execution/AbstractHiveTableScanExec.scala
+++ 
b/shims/spark35/src/main/scala/org/apache/spark/sql/hive/execution/AbstractHiveTableScanExec.scala
@@ -78,7 +78,7 @@ abstract private[hive] class AbstractHiveTableScanExec(
 
   override val output: Seq[Attribute] = {
     // Retrieve the original attributes based on expression ID so that 
capitalization matches.
-    requestedAttributes.map(originalAttributes).distinct
+    requestedAttributes.map(attr => originalAttributes.getOrElse(attr, 
attr)).distinct
   }
 
   // Bind all partition key attribute references in the partition pruning 
predicate for later


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to