This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push:
new b53c170679f [SPARK-45592][SPARK-45282][SQL] Correctness issue in AQE
with InMemoryTableScanExec
b53c170679f is described below
commit b53c170679f5a9a0214095cd345da6f33e09a239
Author: Maryann Xue <[email protected]>
AuthorDate: Tue Nov 14 08:51:26 2023 -0800
[SPARK-45592][SPARK-45282][SQL] Correctness issue in AQE with
InMemoryTableScanExec
This PR fixes an correctness issue while enabling AQE for SQL Cache. This
issue was caused by AQE coalescing the top-level shuffle in the physical plan
of InMemoryTableScan and wrongfully reported the output partitioning of that
InMemoryTableScan as HashPartitioning as if it had not been coalesced. The
caller query of that InMemoryTableScan in turn failed to align the partitions
correctly and output incorrect join results.
The fix addresses the issue by disabling coalescing in InMemoryTableScan
for shuffles in the final stage. This fix also guarantees that AQE enabled for
SQL cache vs. disabled would always be a performance win, since AQE
optimizations are applied to all non-top-level stages and meanwhile no extra
shuffle would be introduced between the parent query and the cached relation
(if coalescing in top-level shuffles of InMemoryTableScan was not disabled, an
extra shuffle would end up being add [...]
To fix correctness issue and to avoid potential AQE perf regressions in
queries using SQL cache.
No.
Added UTs.
No.
Closes #43760 from maryannxue/spark-45592.
Authored-by: Maryann Xue <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit 128f5523194d5241c7b0f08b5be183288128ba16)
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../org/apache/spark/sql/internal/SQLConf.scala | 9 ++++
.../apache/spark/sql/execution/CacheManager.scala | 3 +-
.../execution/adaptive/AdaptiveSparkPlanExec.scala | 8 +++-
.../org/apache/spark/sql/CachedTableSuite.scala | 52 +++++++++++++++-------
4 files changed, 55 insertions(+), 17 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 6578fa16439..951a54a15cb 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -646,6 +646,15 @@ object SQLConf {
.booleanConf
.createWithDefault(false)
+ val ADAPTIVE_EXECUTION_APPLY_FINAL_STAGE_SHUFFLE_OPTIMIZATIONS =
+ buildConf("spark.sql.adaptive.applyFinalStageShuffleOptimizations")
+ .internal()
+ .doc("Configures whether adaptive query execution (if enabled) should
apply shuffle " +
+ "coalescing and local shuffle read optimization for the final query
stage.")
+ .version("3.4.2")
+ .booleanConf
+ .createWithDefault(true)
+
val ADAPTIVE_EXECUTION_LOG_LEVEL = buildConf("spark.sql.adaptive.logLevel")
.internal()
.doc("Configures the log level for adaptive execution logging of plan
changes. The value " +
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index b1153d7a1e8..4a2fe74b853 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -379,7 +379,8 @@ class CacheManager extends Logging with
AdaptiveSparkPlanHelper {
*/
private def getOrCloneSessionWithConfigsOff(session: SparkSession):
SparkSession = {
if (session.conf.get(SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING)) {
- session
+ SparkSession.getOrCloneSessionWithConfigsOff(session,
+ SQLConf.ADAPTIVE_EXECUTION_APPLY_FINAL_STAGE_SHUFFLE_OPTIMIZATIONS ::
Nil)
} else {
SparkSession.getOrCloneSessionWithConfigsOff(session,
forceDisableConfigs)
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
index 395e5468b64..f859e17d937 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
@@ -153,7 +153,13 @@ case class AdaptiveSparkPlanExec(
)
private def optimizeQueryStage(plan: SparkPlan, isFinalStage: Boolean):
SparkPlan = {
- val optimized = queryStageOptimizerRules.foldLeft(plan) { case
(latestPlan, rule) =>
+ val rules = if (isFinalStage &&
+
!conf.getConf(SQLConf.ADAPTIVE_EXECUTION_APPLY_FINAL_STAGE_SHUFFLE_OPTIMIZATIONS))
{
+ queryStageOptimizerRules.filterNot(_.isInstanceOf[AQEShuffleReadRule])
+ } else {
+ queryStageOptimizerRules
+ }
+ val optimized = rules.foldLeft(plan) { case (latestPlan, rule) =>
val applied = rule.apply(latestPlan)
val result = rule match {
case _: AQEShuffleReadRule if !applied.fastEquals(latestPlan) =>
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index 166c271f662..cf40e944c09 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -29,7 +29,7 @@ import org.apache.commons.io.FileUtils
import org.apache.spark.CleanerListener
import org.apache.spark.executor.DataReadMethod._
import org.apache.spark.executor.DataReadMethod.DataReadMethod
-import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
+import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent,
SparkListenerJobStart}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.TempTableAlreadyExistsException
import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
@@ -39,6 +39,7 @@ import org.apache.spark.sql.execution.{ColumnarToRowExec,
ExecSubqueryExpression
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.columnar._
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
+import
org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils}
@@ -1608,23 +1609,44 @@ class CachedTableSuite extends QueryTest with
SQLTestUtils
SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1",
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
- withTempView("t1", "t2", "t3") {
- withSQLConf(SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key ->
"false") {
- sql("CACHE TABLE t1 as SELECT /*+ REPARTITION */ * FROM values(1) as
t(c)")
- assert(spark.table("t1").rdd.partitions.length == 2)
+ var finalPlan = ""
+ val listener = new SparkListener {
+ override def onOtherEvent(event: SparkListenerEvent): Unit = {
+ event match {
+ case SparkListenerSQLAdaptiveExecutionUpdate(_, physicalPlanDesc,
sparkPlanInfo) =>
+ if (sparkPlanInfo.simpleString.startsWith(
+ "AdaptiveSparkPlan isFinalPlan=true")) {
+ finalPlan = physicalPlanDesc
+ }
+ case _ => // ignore other events
+ }
}
+ }
- withSQLConf(SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key ->
"true") {
- assert(spark.table("t1").rdd.partitions.length == 2)
- sql("CACHE TABLE t2 as SELECT /*+ REPARTITION */ * FROM values(2) as
t(c)")
- assert(spark.table("t2").rdd.partitions.length == 1)
- }
+ withTempView("t0", "t1", "t2") {
+ try {
+ spark.range(10).write.saveAsTable("t0")
+ spark.sparkContext.listenerBus.waitUntilEmpty()
+ spark.sparkContext.addSparkListener(listener)
- withSQLConf(SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key ->
"false") {
- assert(spark.table("t1").rdd.partitions.length == 2)
- assert(spark.table("t2").rdd.partitions.length == 1)
- sql("CACHE TABLE t3 as SELECT /*+ REPARTITION */ * FROM values(3) as
t(c)")
- assert(spark.table("t3").rdd.partitions.length == 2)
+ withSQLConf(SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key
-> "false") {
+ sql("CACHE TABLE t1 as SELECT /*+ REPARTITION */ * FROM (" +
+ "SELECT distinct (id+1) FROM t0)")
+ assert(spark.table("t1").rdd.partitions.length == 2)
+ spark.sparkContext.listenerBus.waitUntilEmpty()
+ assert(finalPlan.nonEmpty && !finalPlan.contains("coalesced"))
+ }
+
+ finalPlan = "" // reset finalPlan
+ withSQLConf(SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key
-> "true") {
+ sql("CACHE TABLE t2 as SELECT /*+ REPARTITION */ * FROM (" +
+ "SELECT distinct (id-1) FROM t0)")
+ assert(spark.table("t2").rdd.partitions.length == 2)
+ spark.sparkContext.listenerBus.waitUntilEmpty()
+ assert(finalPlan.nonEmpty && finalPlan.contains("coalesced"))
+ }
+ } finally {
+ spark.sparkContext.removeSparkListener(listener)
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]