This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new f4bcefbe8b33 [SPARK-49352][SQL][3.5] Avoid redundant array transform 
for identical expression
f4bcefbe8b33 is described below

commit f4bcefbe8b33f6d8e64d2542eb69ea271e6a97c5
Author: Liang-Chi Hsieh <vii...@gmail.com>
AuthorDate: Fri Aug 23 22:26:39 2024 -0700

    [SPARK-49352][SQL][3.5] Avoid redundant array transform for identical 
expression
    
    ### What changes were proposed in this pull request?
    
    This patch avoids `ArrayTransform` in `resolveArrayType` function if the 
resolution expression is the same as input param.
    
    ### Why are the changes needed?
    
    Our customer encounters significant performance regression when migrating 
from Spark 3.2 to Spark 3.4 on a `Insert Into` query which is analyzed as a 
`AppendData` on an Iceberg table.
    We found that the root cause is in Spark 3.4, `TableOutputResolver` 
resolves the query with additional `ArrayTransform` on an `ArrayType` field. 
The `ArrayTransform`'s lambda function is actually an identical function, i.e., 
the transformation is redundant.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Unit test and manual e2e test
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #47863 from viirya/fix_redundant_array_transform_3.5.
    
    Authored-by: Liang-Chi Hsieh <vii...@gmail.com>
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
---
 .../catalyst/analysis/TableOutputResolver.scala    | 12 ++++++--
 .../catalyst/analysis/V2WriteAnalysisSuite.scala   | 32 +++++++++++++++++++++-
 2 files changed, 41 insertions(+), 3 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala
index 42abc0eafda7..fabb5634ad10 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala
@@ -371,8 +371,16 @@ object TableOutputResolver {
       resolveColumnsByPosition(tableName, Seq(param), Seq(fakeAttr), conf, 
addError, colPath)
     }
     if (res.length == 1) {
-      val func = LambdaFunction(res.head, Seq(param))
-      Some(Alias(ArrayTransform(nullCheckedInput, func), expected.name)())
+      if (res.head == param) {
+        // If the element type is the same, we can reuse the input array 
directly.
+        Some(
+          Alias(nullCheckedInput, expected.name)(
+            nonInheritableMetadataKeys =
+              Seq(CharVarcharUtils.CHAR_VARCHAR_TYPE_STRING_METADATA_KEY)))
+      } else {
+        val func = LambdaFunction(res.head, Seq(param))
+        Some(Alias(ArrayTransform(nullCheckedInput, func), expected.name)())
+      }
     } else {
       None
     }
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/V2WriteAnalysisSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/V2WriteAnalysisSuite.scala
index d91a080d8fe8..21a049e91418 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/V2WriteAnalysisSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/V2WriteAnalysisSuite.scala
@@ -22,7 +22,7 @@ import java.util.Locale
 import org.apache.spark.QueryContext
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.dsl.plans._
-import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, 
Cast, CreateNamedStruct, GetStructField, If, IsNull, LessThanOrEqual, Literal}
+import org.apache.spark.sql.catalyst.expressions.{Alias, ArrayTransform, 
AttributeReference, Cast, CreateNamedStruct, GetStructField, If, IsNull, 
LessThanOrEqual, Literal}
 import org.apache.spark.sql.catalyst.expressions.objects.AssertNotNull
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
@@ -304,6 +304,36 @@ abstract class V2WriteAnalysisSuiteBase extends 
AnalysisTest {
 
   def byPosition(table: NamedRelation, query: LogicalPlan): LogicalPlan
 
+  test("SPARK-49352: Avoid redundant array transform for identical 
expression") {
+    def assertArrayField(fromType: ArrayType, toType: ArrayType, hasTransform: 
Boolean): Unit = {
+      val table = TestRelation(Seq($"a".int, $"arr".array(toType)))
+      val query = TestRelation(Seq($"arr".array(fromType), $"a".int))
+
+      val writePlan = byName(table, query).analyze
+
+      assertResolved(writePlan)
+      checkAnalysis(writePlan, writePlan)
+
+      val transform = writePlan.children.head.expressions.exists { e =>
+        e.find {
+          case _: ArrayTransform => true
+          case _ => false
+        }.isDefined
+      }
+      if (hasTransform) {
+        assert(transform)
+      } else {
+        assert(!transform)
+      }
+    }
+
+    assertArrayField(ArrayType(LongType), ArrayType(LongType), hasTransform = 
false)
+    assertArrayField(
+      ArrayType(new StructType().add("x", "int").add("y", "int")),
+      ArrayType(new StructType().add("y", "int").add("x", "byte")),
+      hasTransform = true)
+  }
+
   test("SPARK-33136: output resolved on complex types for V2 write commands") {
     def assertTypeCompatibility(name: String, fromType: DataType, toType: 
DataType): Unit = {
       val table = TestRelation(StructType(Seq(StructField("a", toType))))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to