This is an automated email from the ASF dual-hosted git repository.
ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 6e6aa2e0f527 [SPARK-45632][SQL] Table cache should avoid unnecessary
ColumnarToRow when enable AQE
6e6aa2e0f527 is described below
commit 6e6aa2e0f5271ef764080e4b6cece2821f68bfa1
Author: ulysses-you <[email protected]>
AuthorDate: Tue Oct 24 14:14:03 2023 +0800
[SPARK-45632][SQL] Table cache should avoid unnecessary ColumnarToRow when
enable AQE
### What changes were proposed in this pull request?
If the cache serializer supports columnar input, then we do not need a
`ColumnarToRow` before cache data. This pr improves the optimization with AQE
enabled.
### Why are the changes needed?
Avoid unnecessary ColumnarToRow and make `CachedBatchSerializer` use
`convertColumnarBatchToCachedBatch` to cache data.
### Does this PR introduce _any_ user-facing change?
no, the default built-in cache serializer do not support columnar input
### How was this patch tested?
add test
### Was this patch authored or co-authored using generative AI tooling?
no
Closes #43484 from ulysses-you/columnar-cache.
Authored-by: ulysses-you <[email protected]>
Signed-off-by: Xiduo You <[email protected]>
---
.../sql/execution/columnar/InMemoryRelation.scala | 6 +++++
.../columnar/CachedBatchSerializerSuite.scala | 30 +++++++++++++++++++++-
2 files changed, 35 insertions(+), 1 deletion(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
index 27860f23d9b5..4758d739d8c2 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
@@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.types.DataTypeUtils
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.columnar.{CachedBatch, CachedBatchSerializer,
SimpleMetricsCachedBatch, SimpleMetricsCachedBatchSerializer}
import org.apache.spark.sql.execution.{ColumnarToRowTransition, InputAdapter,
QueryExecution, SparkPlan, WholeStageCodegenExec}
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector,
OnHeapColumnVector, WritableColumnVector}
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
import org.apache.spark.sql.types.{BooleanType, ByteType, DoubleType,
FloatType, IntegerType, LongType, ShortType, StructType, UserDefinedType}
@@ -327,6 +328,11 @@ object InMemoryRelation {
}
case c2r: ColumnarToRowTransition => // This matches when whole stage code
gen is disabled.
c2r.child
+ case adaptive: AdaptiveSparkPlanExec =>
+ // If AQE is enabled for cached plan and table cache supports columnar
in, we should mark
+ // `AdaptiveSparkPlanExec.supportsColumnar` as true to avoid inserting
`ColumnarToRow`, so
+ // that `CachedBatchSerializer` can use
`convertColumnarBatchToCachedBatch` to cache data.
+ adaptive.copy(supportsColumnar = true)
case _ => plan
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/CachedBatchSerializerSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/CachedBatchSerializerSuite.scala
index 604ee1f7ace9..46f60e881ddb 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/CachedBatchSerializerSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/CachedBatchSerializerSuite.scala
@@ -25,6 +25,8 @@ import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression,
UnsafeProjection}
import org.apache.spark.sql.columnar.{CachedBatch, CachedBatchSerializer}
+import org.apache.spark.sql.execution.ColumnarToRowExec
+import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec,
AdaptiveSparkPlanHelper}
import org.apache.spark.sql.execution.columnar.InMemoryRelation.clearSerializer
import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
@@ -120,7 +122,8 @@ class TestSingleIntColumnarCachedBatchSerializer extends
CachedBatchSerializer {
}
}
-class CachedBatchSerializerSuite extends QueryTest with SharedSparkSession {
+class CachedBatchSerializerSuite extends QueryTest
+ with SharedSparkSession with AdaptiveSparkPlanHelper {
import testImplicits._
override protected def sparkConf: SparkConf = {
@@ -151,4 +154,29 @@ class CachedBatchSerializerSuite extends QueryTest with
SharedSparkSession {
checkAnswer(df, Row(100) :: Row(200) :: Row(300) :: Row(100) :: Row(200)
:: Row(300) :: Nil)
}
}
+
+ test("SPARK-45632: Table cache should avoid unnecessary ColumnarToRow when
enable AQE") {
+ withTempPath { workDir =>
+ val workDirPath = workDir.getAbsolutePath
+ Seq(100, 200, 300).toDF("c").write.parquet(workDirPath)
+ withSQLConf(SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY.key -> "true") {
+ val df = spark.read.parquet(workDirPath).cache()
+ assert(df.count() == 3)
+
+ val finalPlan = df.queryExecution.executedPlan
+ val tableCacheOpt = find(finalPlan) {
+ case i: InMemoryTableScanExec if
i.relation.cacheBuilder.supportsColumnarInput => true
+ case _ => false
+ }
+ assert(tableCacheOpt.isDefined)
+ val tableCache =
tableCacheOpt.get.asInstanceOf[InMemoryTableScanExec].relation.cachedPlan
+ assert(tableCache.isInstanceOf[AdaptiveSparkPlanExec])
+ assert(tableCache.asInstanceOf[AdaptiveSparkPlanExec].supportsColumnar)
+ assert(collect(tableCache) {
+ case _: ColumnarToRowExec => true
+ }.isEmpty)
+ df.unpersist()
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]