This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 8c63485189c [SPARK-46380][SQL][FOLLOWUP] Simplify the code for
ResolveInlineTables and ResolveInlineTablesSuite
8c63485189c is described below
commit 8c63485189c87fd0a11b57c1a4c8ffa517f5f64e
Author: Jiaan Geng <[email protected]>
AuthorDate: Fri Dec 22 09:49:55 2023 +0800
[SPARK-46380][SQL][FOLLOWUP] Simplify the code for ResolveInlineTables and
ResolveInlineTablesSuite
### What changes were proposed in this pull request?
https://github.com/apache/spark/pull/44316 replace current time/date prior
to evaluating inline table expressions.
This PR propose to simplify the code for `ResolveInlineTables` and let
`ResolveInlineTablesSuite` apply the rule `ResolveInlineTables`.
### Why are the changes needed?
Simplify the code for `ResolveInlineTables` and `ResolveInlineTablesSuite`.
### Does this PR introduce _any_ user-facing change?
'No'.
### How was this patch tested?
Test cases updated.
GA tests.
### Was this patch authored or co-authored using generative AI tooling?
'No'.
Closes #44447 from beliefer/SPARK-46380_followup.
Authored-by: Jiaan Geng <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../catalyst/analysis/ResolveInlineTables.scala | 28 +++++++++-------------
.../analysis/ResolveInlineTablesSuite.scala | 12 +++++-----
2 files changed, 17 insertions(+), 23 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala
index 73600f5c706..811e02b4d97 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala
@@ -95,30 +95,24 @@ object ResolveInlineTables extends Rule[LogicalPlan]
private[analysis] def findCommonTypesAndCast(table: UnresolvedInlineTable):
ResolvedInlineTable = {
// For each column, traverse all the values and find a common data type
and nullability.
- val fields = table.rows.transpose.zip(table.names).map { case (column,
name) =>
+ val (fields, columns) = table.rows.transpose.zip(table.names).map { case
(column, name) =>
val inputTypes = column.map(_.dataType)
val tpe =
TypeCoercion.findWiderTypeWithoutStringPromotion(inputTypes).getOrElse {
table.failAnalysis(
errorClass =
"INVALID_INLINE_TABLE.INCOMPATIBLE_TYPES_IN_INLINE_TABLE",
messageParameters = Map("colName" -> toSQLId(name)))
}
- StructField(name, tpe, nullable = column.exists(_.nullable))
- }
- val attributes = DataTypeUtils.toAttributes(StructType(fields))
- assert(fields.size == table.names.size)
-
- val castedRows: Seq[Seq[Expression]] = table.rows.map { row =>
- row.zipWithIndex.map {
- case (e, ci) =>
- val targetType = fields(ci).dataType
- val castedExpr = if (DataTypeUtils.sameType(e.dataType, targetType))
{
- e
- } else {
- cast(e, targetType)
- }
- castedExpr
+ val newColumn = column.map {
+ case expr if DataTypeUtils.sameType(expr.dataType, tpe) =>
+ expr
+ case expr =>
+ cast(expr, tpe)
}
- }
+ (StructField(name, tpe, nullable = column.exists(_.nullable)), newColumn)
+ }.unzip
+ assert(fields.size == table.names.size)
+ val attributes = DataTypeUtils.toAttributes(StructType(fields))
+ val castedRows: Seq[Seq[Expression]] = columns.transpose
ResolvedInlineTable(castedRows, attributes)
}
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTablesSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTablesSuite.scala
index 758b6b73e4e..3e014d1c11d 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTablesSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTablesSuite.scala
@@ -86,8 +86,9 @@ class ResolveInlineTablesSuite extends AnalysisTest with
BeforeAndAfter {
test("cast and execute") {
val table = UnresolvedInlineTable(Seq("c1"), Seq(Seq(lit(1)),
Seq(lit(2L))))
- val resolved = ResolveInlineTables.findCommonTypesAndCast(table)
- val converted =
ResolveInlineTables.earlyEvalIfPossible(resolved).asInstanceOf[LocalRelation]
+ val resolved = ResolveInlineTables(table)
+ assert(resolved.isInstanceOf[LocalRelation])
+ val converted = resolved.asInstanceOf[LocalRelation]
assert(converted.output.map(_.dataType) == Seq(LongType))
assert(converted.data.size == 2)
@@ -98,12 +99,11 @@ class ResolveInlineTablesSuite extends AnalysisTest with
BeforeAndAfter {
test("cast and execute CURRENT_LIKE expressions") {
val table = UnresolvedInlineTable(Seq("c1"), Seq(
Seq(CurrentTimestamp()), Seq(CurrentTimestamp())))
- val casted = ResolveInlineTables.findCommonTypesAndCast(table)
- val earlyEval = ResolveInlineTables.earlyEvalIfPossible(casted)
+ val resolved = ResolveInlineTables(table)
// Early eval should keep it in expression form.
- assert(earlyEval.isInstanceOf[ResolvedInlineTable])
+ assert(resolved.isInstanceOf[ResolvedInlineTable])
- EvalInlineTables(ComputeCurrentTime(earlyEval)) match {
+ EvalInlineTables(ComputeCurrentTime(resolved)) match {
case LocalRelation(output, data, _) =>
assert(output.map(_.dataType) == Seq(TimestampType))
assert(data.size == 2)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]