This is an automated email from the ASF dual-hosted git repository.

ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 6e6aa2e0f527 [SPARK-45632][SQL] Table cache should avoid unnecessary 
ColumnarToRow when enable AQE
6e6aa2e0f527 is described below

commit 6e6aa2e0f5271ef764080e4b6cece2821f68bfa1
Author: ulysses-you <[email protected]>
AuthorDate: Tue Oct 24 14:14:03 2023 +0800

    [SPARK-45632][SQL] Table cache should avoid unnecessary ColumnarToRow when 
enable AQE
    
    ### What changes were proposed in this pull request?
    
    If the cache serializer supports columnar input, then we do not need a 
`ColumnarToRow` before cache data. This pr improves the optimization with AQE 
enabled.
    
    ### Why are the changes needed?
    
    Avoid unnecessary ColumnarToRow and make `CachedBatchSerializer` use 
`convertColumnarBatchToCachedBatch` to cache data.
    
    ### Does this PR introduce _any_ user-facing change?
    
    no, the default built-in cache serializer do not support columnar input
    
    ### How was this patch tested?
    
    add test
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    no
    
    Closes #43484 from ulysses-you/columnar-cache.
    
    Authored-by: ulysses-you <[email protected]>
    Signed-off-by: Xiduo You <[email protected]>
---
 .../sql/execution/columnar/InMemoryRelation.scala  |  6 +++++
 .../columnar/CachedBatchSerializerSuite.scala      | 30 +++++++++++++++++++++-
 2 files changed, 35 insertions(+), 1 deletion(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
index 27860f23d9b5..4758d739d8c2 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
@@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.types.DataTypeUtils
 import org.apache.spark.sql.catalyst.util.truncatedString
 import org.apache.spark.sql.columnar.{CachedBatch, CachedBatchSerializer, 
SimpleMetricsCachedBatch, SimpleMetricsCachedBatchSerializer}
 import org.apache.spark.sql.execution.{ColumnarToRowTransition, InputAdapter, 
QueryExecution, SparkPlan, WholeStageCodegenExec}
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
 import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, 
OnHeapColumnVector, WritableColumnVector}
 import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
 import org.apache.spark.sql.types.{BooleanType, ByteType, DoubleType, 
FloatType, IntegerType, LongType, ShortType, StructType, UserDefinedType}
@@ -327,6 +328,11 @@ object InMemoryRelation {
     }
     case c2r: ColumnarToRowTransition => // This matches when whole stage code 
gen is disabled.
       c2r.child
+    case adaptive: AdaptiveSparkPlanExec =>
+      // If AQE is enabled for cached plan and table cache supports columnar 
in, we should mark
+      // `AdaptiveSparkPlanExec.supportsColumnar` as true to avoid inserting 
`ColumnarToRow`, so
+      // that `CachedBatchSerializer` can use 
`convertColumnarBatchToCachedBatch` to cache data.
+      adaptive.copy(supportsColumnar = true)
     case _ => plan
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/CachedBatchSerializerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/CachedBatchSerializerSuite.scala
index 604ee1f7ace9..46f60e881ddb 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/CachedBatchSerializerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/CachedBatchSerializerSuite.scala
@@ -25,6 +25,8 @@ import org.apache.spark.sql.{QueryTest, Row}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, 
UnsafeProjection}
 import org.apache.spark.sql.columnar.{CachedBatch, CachedBatchSerializer}
+import org.apache.spark.sql.execution.ColumnarToRowExec
+import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, 
AdaptiveSparkPlanHelper}
 import org.apache.spark.sql.execution.columnar.InMemoryRelation.clearSerializer
 import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector
 import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
@@ -120,7 +122,8 @@ class TestSingleIntColumnarCachedBatchSerializer extends 
CachedBatchSerializer {
   }
 }
 
-class CachedBatchSerializerSuite  extends QueryTest with SharedSparkSession {
+class CachedBatchSerializerSuite extends QueryTest
+  with SharedSparkSession with AdaptiveSparkPlanHelper {
   import testImplicits._
 
   override protected def sparkConf: SparkConf = {
@@ -151,4 +154,29 @@ class CachedBatchSerializerSuite  extends QueryTest with 
SharedSparkSession {
       checkAnswer(df, Row(100) :: Row(200) :: Row(300) :: Row(100) :: Row(200) 
:: Row(300) :: Nil)
     }
   }
+
+  test("SPARK-45632: Table cache should avoid unnecessary ColumnarToRow when 
enable AQE") {
+    withTempPath { workDir =>
+      val workDirPath = workDir.getAbsolutePath
+      Seq(100, 200, 300).toDF("c").write.parquet(workDirPath)
+      withSQLConf(SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY.key -> "true") {
+        val df = spark.read.parquet(workDirPath).cache()
+        assert(df.count() == 3)
+
+        val finalPlan = df.queryExecution.executedPlan
+        val tableCacheOpt = find(finalPlan) {
+          case i: InMemoryTableScanExec if 
i.relation.cacheBuilder.supportsColumnarInput => true
+          case _ => false
+        }
+        assert(tableCacheOpt.isDefined)
+        val tableCache = 
tableCacheOpt.get.asInstanceOf[InMemoryTableScanExec].relation.cachedPlan
+        assert(tableCache.isInstanceOf[AdaptiveSparkPlanExec])
+        assert(tableCache.asInstanceOf[AdaptiveSparkPlanExec].supportsColumnar)
+        assert(collect(tableCache) {
+          case _: ColumnarToRowExec => true
+        }.isEmpty)
+        df.unpersist()
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to