This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push: new f4bcefbe8b33 [SPARK-49352][SQL][3.5] Avoid redundant array transform for identical expression f4bcefbe8b33 is described below commit f4bcefbe8b33f6d8e64d2542eb69ea271e6a97c5 Author: Liang-Chi Hsieh <vii...@gmail.com> AuthorDate: Fri Aug 23 22:26:39 2024 -0700 [SPARK-49352][SQL][3.5] Avoid redundant array transform for identical expression ### What changes were proposed in this pull request? This patch avoids `ArrayTransform` in `resolveArrayType` function if the resolution expression is the same as input param. ### Why are the changes needed? Our customer encounters significant performance regression when migrating from Spark 3.2 to Spark 3.4 on a `Insert Into` query which is analyzed as a `AppendData` on an Iceberg table. We found that the root cause is in Spark 3.4, `TableOutputResolver` resolves the query with additional `ArrayTransform` on an `ArrayType` field. The `ArrayTransform`'s lambda function is actually an identical function, i.e., the transformation is redundant. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit test and manual e2e test ### Was this patch authored or co-authored using generative AI tooling? No Closes #47863 from viirya/fix_redundant_array_transform_3.5. Authored-by: Liang-Chi Hsieh <vii...@gmail.com> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- .../catalyst/analysis/TableOutputResolver.scala | 12 ++++++-- .../catalyst/analysis/V2WriteAnalysisSuite.scala | 32 +++++++++++++++++++++- 2 files changed, 41 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala index 42abc0eafda7..fabb5634ad10 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala @@ -371,8 +371,16 @@ object TableOutputResolver { resolveColumnsByPosition(tableName, Seq(param), Seq(fakeAttr), conf, addError, colPath) } if (res.length == 1) { - val func = LambdaFunction(res.head, Seq(param)) - Some(Alias(ArrayTransform(nullCheckedInput, func), expected.name)()) + if (res.head == param) { + // If the element type is the same, we can reuse the input array directly. + Some( + Alias(nullCheckedInput, expected.name)( + nonInheritableMetadataKeys = + Seq(CharVarcharUtils.CHAR_VARCHAR_TYPE_STRING_METADATA_KEY))) + } else { + val func = LambdaFunction(res.head, Seq(param)) + Some(Alias(ArrayTransform(nullCheckedInput, func), expected.name)()) + } } else { None } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/V2WriteAnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/V2WriteAnalysisSuite.scala index d91a080d8fe8..21a049e91418 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/V2WriteAnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/V2WriteAnalysisSuite.scala @@ -22,7 +22,7 @@ import java.util.Locale import org.apache.spark.QueryContext import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ -import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Cast, CreateNamedStruct, GetStructField, If, IsNull, LessThanOrEqual, Literal} +import org.apache.spark.sql.catalyst.expressions.{Alias, ArrayTransform, AttributeReference, Cast, CreateNamedStruct, GetStructField, If, IsNull, LessThanOrEqual, Literal} import org.apache.spark.sql.catalyst.expressions.objects.AssertNotNull import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule @@ -304,6 +304,36 @@ abstract class V2WriteAnalysisSuiteBase extends AnalysisTest { def byPosition(table: NamedRelation, query: LogicalPlan): LogicalPlan + test("SPARK-49352: Avoid redundant array transform for identical expression") { + def assertArrayField(fromType: ArrayType, toType: ArrayType, hasTransform: Boolean): Unit = { + val table = TestRelation(Seq($"a".int, $"arr".array(toType))) + val query = TestRelation(Seq($"arr".array(fromType), $"a".int)) + + val writePlan = byName(table, query).analyze + + assertResolved(writePlan) + checkAnalysis(writePlan, writePlan) + + val transform = writePlan.children.head.expressions.exists { e => + e.find { + case _: ArrayTransform => true + case _ => false + }.isDefined + } + if (hasTransform) { + assert(transform) + } else { + assert(!transform) + } + } + + assertArrayField(ArrayType(LongType), ArrayType(LongType), hasTransform = false) + assertArrayField( + ArrayType(new StructType().add("x", "int").add("y", "int")), + ArrayType(new StructType().add("y", "int").add("x", "byte")), + hasTransform = true) + } + test("SPARK-33136: output resolved on complex types for V2 write commands") { def assertTypeCompatibility(name: String, fromType: DataType, toType: DataType): Unit = { val table = TestRelation(StructType(Seq(StructField("a", toType)))) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org