This is an automated email from the ASF dual-hosted git repository.
ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 061545b2b [KYUUBI #4664] Fix empty relation when kill executors
061545b2b is described below
commit 061545b2bd632586d8d8367ff6d90dea3949e52f
Author: ulysses-you <[email protected]>
AuthorDate: Tue Apr 4 17:06:57 2023 +0800
[KYUUBI #4664] Fix empty relation when kill executors
### _Why are the changes needed?_
This pr fixes a corner case when repartition on a local relation. e.g.,
```
Repartition
|
LocalRelation
```
it would throw exception since there is no a actually shuffle happen
```
java.util.NoSuchElementException: key not found: 3
at scala.collection.MapLike.default(MapLike.scala:235)
at scala.collection.MapLike.default$(MapLike.scala:234)
at scala.collection.AbstractMap.default(Map.scala:63)
at scala.collection.MapLike.apply(MapLike.scala:144)
at scala.collection.MapLike.apply$(MapLike.scala:143)
at scala.collection.AbstractMap.apply(Map.scala:63)
at
org.apache.spark.sql.FinalStageResourceManager.findExecutorToKill(FinalStageResourceManager.scala:122)
at
org.apache.spark.sql.FinalStageResourceManager.killExecutors(FinalStageResourceManager.scala:175)
```
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run
test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #4664 from ulysses-you/kill-executors-followup.
Closes #4664
3811eaee9 [ulysses-you] Fix empty relation
Authored-by: ulysses-you <[email protected]>
Signed-off-by: ulyssesyou <[email protected]>
---
.../org/apache/spark/sql/FinalStageResourceManager.scala | 12 ++++++++++--
1 file changed, 10 insertions(+), 2 deletions(-)
diff --git
a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/spark/sql/FinalStageResourceManager.scala
b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/spark/sql/FinalStageResourceManager.scala
index 2bf7ae6b7..ca3f762e1 100644
---
a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/spark/sql/FinalStageResourceManager.scala
+++
b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/spark/sql/FinalStageResourceManager.scala
@@ -69,6 +69,7 @@ case class FinalStageResourceManager(session: SparkSession)
return plan
}
+ // TODO: move this to query stage optimizer when updating Spark to 3.5.x
// Since we are in `prepareQueryStage`, the AQE shuffle read has not been
applied.
// So we need to apply it by self.
val shuffleRead =
queryStageOptimizerRules.foldLeft(stageOpt.get.asInstanceOf[SparkPlan]) {
@@ -119,7 +120,11 @@ case class FinalStageResourceManager(session: SparkSession)
shuffleId: Int,
numReduce: Int): Seq[String] = {
val tracker =
SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
- val shuffleStatus = tracker.shuffleStatuses(shuffleId)
+ val shuffleStatusOpt = tracker.shuffleStatuses.get(shuffleId)
+ if (shuffleStatusOpt.isEmpty) {
+ return Seq.empty
+ }
+ val shuffleStatus = shuffleStatusOpt.get
val executorToBlockSize = new mutable.HashMap[String, Long]
shuffleStatus.withMapStatuses { mapStatus =>
mapStatus.foreach { status =>
@@ -175,6 +180,9 @@ case class FinalStageResourceManager(session: SparkSession)
val executorsToKill = findExecutorToKill(sc, targetExecutors, shuffleId,
numReduce)
logInfo(s"Request to kill executors, total count ${executorsToKill.size},
" +
s"[${executorsToKill.mkString(", ")}].")
+ if (executorsToKill.isEmpty) {
+ return
+ }
// Note, `SparkContext#killExecutors` does not allow with DRA enabled,
// see `https://github.com/apache/spark/pull/20604`.
@@ -201,7 +209,7 @@ trait FinalRebalanceStageHelper {
case f: FilterExec => findFinalRebalanceStage(f.child)
case s: SortExec if !s.global => findFinalRebalanceStage(s.child)
case stage: ShuffleQueryStageExec
- if stage.isMaterialized &&
+ if stage.isMaterialized && stage.mapStats.isDefined &&
stage.plan.isInstanceOf[ShuffleExchangeExec] &&
stage.plan.asInstanceOf[ShuffleExchangeExec].shuffleOrigin !=
ENSURE_REQUIREMENTS =>
Some(stage)