This is an automated email from the ASF dual-hosted git repository.
taiyangli pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 4ed9e2e57a [GLUTEN-9055][CH] Fix input_file_name diff from hive text
table (#9142)
4ed9e2e57a is described below
commit 4ed9e2e57a5613e3d3eecd77e3f203795a89b1fb
Author: 李扬 <[email protected]>
AuthorDate: Mon Mar 31 14:53:28 2025 +0800
[GLUTEN-9055][CH] Fix input_file_name diff from hive text table (#9142)
* fix input_file_name diff
* add uts
* fix failed uts
* fix failed uts
---
.../hive/GlutenClickHouseHiveTableSuite.scala | 58 ++++++++++++++++++++++
.../columnar/PushDownInputFileExpression.scala | 9 ++++
.../hive/execution/AbstractHiveTableScanExec.scala | 2 +-
.../hive/execution/AbstractHiveTableScanExec.scala | 2 +-
.../hive/execution/AbstractHiveTableScanExec.scala | 2 +-
.../hive/execution/AbstractHiveTableScanExec.scala | 2 +-
6 files changed, 71 insertions(+), 4 deletions(-)
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseHiveTableSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseHiveTableSuite.scala
index a64ac191a9..4d8b66b4c5 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseHiveTableSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseHiveTableSuite.scala
@@ -1644,4 +1644,62 @@ class GlutenClickHouseHiveTableSuite
}
}
+ test("test input_file_name() in different formats") {
+ val formats = Seq("textfile", "orc", "parquet")
+ val tableNamePrefix = "sales_"
+
+ formats.foreach {
+ format =>
+ val tableName = s"$tableNamePrefix${format.take(2)}"
+ val createTableSql =
+ s"""
+ |CREATE TABLE $tableName (
+ | product_id STRING,
+ | quantity INT
+ |) PARTITIONED BY (year STRING)
+ |STORED AS $format
+ |""".stripMargin
+
+ val insertDataSql1 =
+ s"""
+ |INSERT INTO $tableName PARTITION(year='2001')
+ |SELECT 'prod1', 100
+ |""".stripMargin
+
+ val insertDataSql2 =
+ s"""
+ |INSERT INTO $tableName PARTITION(year='2002')
+ |SELECT 'prod1', 200
+ |""".stripMargin
+
+ val select1Sql = s"SELECT input_file_name() from $tableName"
+ val select2Sql = s"SELECT input_file_block_start(), " +
+ s"input_file_block_length() FROM $tableName"
+ s"input_file_block_length() FROM $tableName"
+ val dropSql = s"DROP TABLE IF EXISTS $tableName"
+
+ spark.sql(createTableSql)
+ spark.sql(insertDataSql1)
+ spark.sql(insertDataSql2)
+
+ if (format.equals("textfile")) {
+ // When format is textfile, input_file_name() in vanilla returns
paths like 'file:/xxx'
+ // But in gluten it returns paths like 'file:///xxx'.
+ val result = spark.sql(select1Sql)
+ result
+ .collect()
+ .foreach(
+ row => {
+ assert(!row.isNullAt(0) && row.getString(0).nonEmpty)
+ })
+ } else {
+ compareResultsAgainstVanillaSpark(select1Sql, compareResult = true,
_ => {})
+ }
+
+ compareResultsAgainstVanillaSpark(select2Sql, compareResult = true, _
=> {})
+
+ spark.sql(dropSql)
+ }
+ }
+
}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/PushDownInputFileExpression.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/PushDownInputFileExpression.scala
index 778bd62b6d..b419bd08f3 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/PushDownInputFileExpression.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/PushDownInputFileExpression.scala
@@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.expressions.{Alias,
Attribute, AttributeRef
import org.apache.spark.sql.catalyst.optimizer.CollapseProjectShim
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{DeserializeToObjectExec, LeafExecNode,
ProjectExec, SerializeFromObjectExec, SparkPlan, UnionExec}
+import org.apache.spark.sql.hive.HiveTableScanExecTransformer
import scala.collection.mutable
@@ -116,6 +117,14 @@ object PushDownInputFileExpression {
case p @ ProjectExec(projectList, child: FileSourceScanExecTransformer)
if projectList.exists(containsInputFileRelatedExpr) =>
child.copy(output = p.output)
+ case p @ ProjectExec(projectList, child: HiveTableScanExecTransformer)
+ if projectList.exists(containsInputFileRelatedExpr) =>
+ child.copy(
+ requestedAttributes = p.output,
+ relation = child.relation,
+ partitionPruningPred = child.partitionPruningPred,
+ prunedOutput = child.prunedOutput
+ )(child.session)
case p @ ProjectExec(projectList, child: BatchScanExecTransformer)
if projectList.exists(containsInputFileRelatedExpr) =>
child.copy(output = p.output.asInstanceOf[Seq[AttributeReference]])
diff --git
a/shims/spark32/src/main/scala/org/apache/spark/sql/hive/execution/AbstractHiveTableScanExec.scala
b/shims/spark32/src/main/scala/org/apache/spark/sql/hive/execution/AbstractHiveTableScanExec.scala
index f38c85a49d..e1ab055e18 100644
---
a/shims/spark32/src/main/scala/org/apache/spark/sql/hive/execution/AbstractHiveTableScanExec.scala
+++
b/shims/spark32/src/main/scala/org/apache/spark/sql/hive/execution/AbstractHiveTableScanExec.scala
@@ -76,7 +76,7 @@ abstract private[hive] class AbstractHiveTableScanExec(
override val output: Seq[Attribute] = {
// Retrieve the original attributes based on expression ID so that
capitalization matches.
- requestedAttributes.map(originalAttributes).distinct
+ requestedAttributes.map(attr => originalAttributes.getOrElse(attr,
attr)).distinct
}
// Bind all partition key attribute references in the partition pruning
predicate for later
diff --git
a/shims/spark33/src/main/scala/org/apache/spark/sql/hive/execution/AbstractHiveTableScanExec.scala
b/shims/spark33/src/main/scala/org/apache/spark/sql/hive/execution/AbstractHiveTableScanExec.scala
index d9b6bb936f..52104f3eea 100644
---
a/shims/spark33/src/main/scala/org/apache/spark/sql/hive/execution/AbstractHiveTableScanExec.scala
+++
b/shims/spark33/src/main/scala/org/apache/spark/sql/hive/execution/AbstractHiveTableScanExec.scala
@@ -76,7 +76,7 @@ abstract private[hive] class AbstractHiveTableScanExec(
override val output: Seq[Attribute] = {
// Retrieve the original attributes based on expression ID so that
capitalization matches.
- requestedAttributes.map(originalAttributes).distinct
+ requestedAttributes.map(attr => originalAttributes.getOrElse(attr,
attr)).distinct
}
// Bind all partition key attribute references in the partition pruning
predicate for later
diff --git
a/shims/spark34/src/main/scala/org/apache/spark/sql/hive/execution/AbstractHiveTableScanExec.scala
b/shims/spark34/src/main/scala/org/apache/spark/sql/hive/execution/AbstractHiveTableScanExec.scala
index 3521d49654..48f5bb3cbd 100644
---
a/shims/spark34/src/main/scala/org/apache/spark/sql/hive/execution/AbstractHiveTableScanExec.scala
+++
b/shims/spark34/src/main/scala/org/apache/spark/sql/hive/execution/AbstractHiveTableScanExec.scala
@@ -78,7 +78,7 @@ abstract private[hive] class AbstractHiveTableScanExec(
override val output: Seq[Attribute] = {
// Retrieve the original attributes based on expression ID so that
capitalization matches.
- requestedAttributes.map(originalAttributes).distinct
+ requestedAttributes.map(attr => originalAttributes.getOrElse(attr,
attr)).distinct
}
// Bind all partition key attribute references in the partition pruning
predicate for later
diff --git
a/shims/spark35/src/main/scala/org/apache/spark/sql/hive/execution/AbstractHiveTableScanExec.scala
b/shims/spark35/src/main/scala/org/apache/spark/sql/hive/execution/AbstractHiveTableScanExec.scala
index 3521d49654..48f5bb3cbd 100644
---
a/shims/spark35/src/main/scala/org/apache/spark/sql/hive/execution/AbstractHiveTableScanExec.scala
+++
b/shims/spark35/src/main/scala/org/apache/spark/sql/hive/execution/AbstractHiveTableScanExec.scala
@@ -78,7 +78,7 @@ abstract private[hive] class AbstractHiveTableScanExec(
override val output: Seq[Attribute] = {
// Retrieve the original attributes based on expression ID so that
capitalization matches.
- requestedAttributes.map(originalAttributes).distinct
+ requestedAttributes.map(attr => originalAttributes.getOrElse(attr,
attr)).distinct
}
// Bind all partition key attribute references in the partition pruning
predicate for later
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]