This is an automated email from the ASF dual-hosted git repository.
ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 91a2ab366 [KYUUBI #4678] Improve FinalStageResourceManager kill
executors
91a2ab366 is described below
commit 91a2ab3665f44ade8aa768a9bf125bcd8a71478f
Author: ulysses-you <[email protected]>
AuthorDate: Mon Apr 10 11:41:37 2023 +0800
[KYUUBI #4678] Improve FinalStageResourceManager kill executors
### _Why are the changes needed?_
This pr change two things:
1. add a config to kill executors if the plan contains table caches. It's
not always safe to kill executors if the cache is referenced by two write-like
plan.
2. force adjustTargetNumExecutors when killing executors. YarnAllocator`
might re-request original target executors if DRA has not updated target
executors yet. Note, DRA would re-adjust executors if there are more tasks to
be executed, so we are safe. It's better to adjuest target num executor once we
kill executors.
### _How was this patch tested?_
These issues are found during my POC
Closes #4678 from ulysses-you/skip-cache.
Closes #4678
b12620954 [ulysses-you] Improve kill executors
Authored-by: ulysses-you <[email protected]>
Signed-off-by: ulyssesyou <[email protected]>
---
docs/extensions/engines/spark/rules.md | 1 +
.../spark/sql/FinalStageResourceManager.scala | 28 ++++++++++++++++++++--
.../org/apache/kyuubi/sql/KyuubiSQLConf.scala | 7 ++++++
3 files changed, 34 insertions(+), 2 deletions(-)
diff --git a/docs/extensions/engines/spark/rules.md
b/docs/extensions/engines/spark/rules.md
index a4bda5d53..46e8dd3d1 100644
--- a/docs/extensions/engines/spark/rules.md
+++ b/docs/extensions/engines/spark/rules.md
@@ -84,6 +84,7 @@ Kyuubi provides some configs to make these feature easy to
use.
| spark.sql.optimizer.insertRepartitionBeforeWriteIfNoShuffle.enabled | false
| When true, add repartition even if the
original plan does not have shuffle.
| 1.7.0 |
| spark.sql.optimizer.finalStageConfigIsolationWriteOnly.enabled | true
| When true, only enable final stage isolation
for writing.
| 1.7.0 |
| spark.sql.finalWriteStage.eagerlyKillExecutors.enabled | false
| When true, eagerly kill redundant executors
before running final write stage.
| 1.8.0 |
+| spark.sql.finalWriteStage.skipKillingExecutorsForTableCache | true
| When true, skip killing executors if the plan
has table caches.
| 1.8.0 |
| spark.sql.finalWriteStage.retainExecutorsFactor | 1.2
| If the target executors * factor < active
executors, and target executors * factor > min executors, then inject kill
executors or inject custom resource profile.
| 1.8.0 |
| spark.sql.finalWriteStage.resourceIsolation.enabled | false
| When true, make final write stage resource
isolation using custom RDD resource profile.
| 1.2.0 |
| spark.sql.finalWriteStageExecutorCores |
fallback spark.executor.cores | Specify the executor core request for
final write stage. It would be passed to the RDD resource profile.
| 1.8.0 |
diff --git
a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/spark/sql/FinalStageResourceManager.scala
b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/spark/sql/FinalStageResourceManager.scala
index ca3f762e1..7a0ae1592 100644
---
a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/spark/sql/FinalStageResourceManager.scala
+++
b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/spark/sql/FinalStageResourceManager.scala
@@ -26,6 +26,7 @@ import
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SortExec,
SparkPlan}
import org.apache.spark.sql.execution.adaptive._
+import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS,
ShuffleExchangeExec}
import org.apache.kyuubi.sql.{KyuubiSQLConf, MarkNumOutputColumnsRule}
@@ -69,6 +70,13 @@ case class FinalStageResourceManager(session: SparkSession)
return plan
}
+ // It's not safe to kill executors if this plan contains table cache.
+ // If the executor loses then the rdd would re-compute those partition.
+ if (hasTableCache(plan) &&
+
conf.getConf(KyuubiSQLConf.FINAL_WRITE_STAGE_SKIP_KILLING_EXECUTORS_FOR_TABLE_CACHE))
{
+ return plan
+ }
+
// TODO: move this to query stage optimizer when updating Spark to 3.5.x
// Since we are in `prepareQueryStage`, the AQE shuffle read has not been
applied.
// So we need to apply it by self.
@@ -188,9 +196,18 @@ case class FinalStageResourceManager(session: SparkSession)
// see `https://github.com/apache/spark/pull/20604`.
// It may cause the status in `ExecutorAllocationManager` inconsistent with
// `CoarseGrainedSchedulerBackend` for a while. But it should be
synchronous finally.
+ //
+ // We should adjust target num executors, otherwise `YarnAllocator` might
re-request original
+ // target executors if DRA has not updated target executors yet.
+ // Note, DRA would re-adjust executors if there are more tasks to be
executed, so we are safe.
+ //
+ // * We kill executor
+ // * YarnAllocator re-request target executors
+ // * DRA can not release executors since they are new added
+ // ---------------------------------------------------------------->
timeline
executorAllocationClient.killExecutors(
executorIds = executorsToKill,
- adjustTargetNumExecutors = false,
+ adjustTargetNumExecutors = true,
countFailures = false,
force = false)
}
@@ -201,7 +218,7 @@ case class FinalStageResourceManager(session: SparkSession)
OptimizeShuffleWithLocalRead)
}
-trait FinalRebalanceStageHelper {
+trait FinalRebalanceStageHelper extends AdaptiveSparkPlanHelper {
@tailrec
final protected def findFinalRebalanceStage(plan: SparkPlan):
Option[ShuffleQueryStageExec] = {
plan match {
@@ -216,4 +233,11 @@ trait FinalRebalanceStageHelper {
case _ => None
}
}
+
+ final protected def hasTableCache(plan: SparkPlan): Boolean = {
+ find(plan) {
+ case _: InMemoryTableScanExec => true
+ case _ => false
+ }.isDefined
+ }
}
diff --git
a/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
b/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
index 4df924b51..aeee45869 100644
---
a/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
+++
b/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
@@ -198,6 +198,13 @@ object KyuubiSQLConf {
.booleanConf
.createWithDefault(false)
+ val FINAL_WRITE_STAGE_SKIP_KILLING_EXECUTORS_FOR_TABLE_CACHE =
+ buildConf("spark.sql.finalWriteStage.skipKillingExecutorsForTableCache")
+ .doc("When true, skip killing executors if the plan has table caches.")
+ .version("1.8.0")
+ .booleanConf
+ .createWithDefault(true)
+
val FINAL_WRITE_STAGE_PARTITION_FACTOR =
buildConf("spark.sql.finalWriteStage.retainExecutorsFactor")
.doc("If the target executors * factor < active executors, and " +