This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new d513f1f1e [KYUUBI #5136][Bug] Spark App may hang forever if
FinalStageResourceManager killed all executors
d513f1f1e is described below
commit d513f1f1e60fc0ab53095a1f24cea5c354174891
Author: zhouyifan279 <[email protected]>
AuthorDate: Wed Aug 16 16:09:17 2023 +0800
[KYUUBI #5136][Bug] Spark App may hang forever if FinalStageResourceManager
killed all executors
### _Why are the changes needed?_
In minor cases, Spark Stage hangs forever when
spark.sql.finalWriteStage.eagerlyKillExecutors.enabled is true.
The bug occurs if two conditions are met in the same time:
1. All executors are either removed because of idle time out or killed by
FinalStageResourceManager.
Target executor num in YarnAllocator will be set to 0 and no more
executor will be launched.
2. Target executor num in ExecutorAllocationManager equals to the executor
num needed by final stage.
Then ExecutorAllocationManager will not sync target executor num to
YarnAllocator.
### _How was this patch tested?_
- [x] Add a new test suite `FinalStageResourceManagerSuite`
Closes #5141 from zhouyifan279/adjust-executors.
Closes #5136
c4403eefa [zhouyifan279] assert adjustedTargetExecutors == 1
ea8f24733 [zhouyifan279] Add comment
5f3ca1d9c [zhouyifan279] [KYUUBI #5136][Bug] Spark App may hang forever if
FinalStageResourceManager killed all executors
12687eee7 [zhouyifan279] [KYUUBI #5136][Bug] Spark App may hang forever if
FinalStageResourceManager killed all executors
9dcbc780d [zhouyifan279] [KYUUBI #5136][Bug] Spark App may hang forever if
FinalStageResourceManager killed all executors
Authored-by: zhouyifan279 <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
.github/workflows/master.yml | 6 +--
.../spark/kyuubi-extension-spark-3-3/pom.xml | 48 +++++++++++++++++
.../kyuubi/sql/KyuubiSparkSQLExtension.scala | 2 +-
.../spark/sql/FinalStageResourceManager.scala | 42 ++++++++++++++-
.../spark/sql/FinalStageResourceManagerSuite.scala | 62 ++++++++++++++++++++++
.../spark/kyuubi-extension-spark-3-4/pom.xml | 54 +++++++++++++++++++
.../org/apache/kyuubi/sql/KyuubiSQLConf.scala | 8 +++
.../kyuubi/sql/KyuubiSparkSQLExtension.scala | 2 +-
.../spark/sql/FinalStageResourceManager.scala | 42 ++++++++++++++-
.../spark/sql/FinalStageResourceManagerSuite.scala | 62 ++++++++++++++++++++++
.../org/apache/kyuubi/sql/KyuubiSQLConf.scala | 8 +++
.../apache/kyuubi/tags/SparkLocalClusterTest.java | 29 ++++++++++
12 files changed, 358 insertions(+), 7 deletions(-)
diff --git a/.github/workflows/master.yml b/.github/workflows/master.yml
index 674c7d060..00e5772a2 100644
--- a/.github/workflows/master.yml
+++ b/.github/workflows/master.yml
@@ -59,17 +59,17 @@ jobs:
- java: 8
spark: '3.3'
spark-archive:
'-Dspark.archive.mirror=https://archive.apache.org/dist/spark/spark-3.1.3
-Dspark.archive.name=spark-3.1.3-bin-hadoop3.2.tgz -Pzookeeper-3.6'
- exclude-tags:
'-Dmaven.plugin.scalatest.exclude.tags=org.scalatest.tags.Slow,org.apache.kyuubi.tags.DeltaTest,org.apache.kyuubi.tags.IcebergTest'
+ exclude-tags:
'-Dmaven.plugin.scalatest.exclude.tags=org.scalatest.tags.Slow,org.apache.kyuubi.tags.DeltaTest,org.apache.kyuubi.tags.IcebergTest,org.apache.kyuubi.tags.SparkLocalClusterTest'
comment: 'verify-on-spark-3.1-binary'
- java: 8
spark: '3.3'
spark-archive:
'-Dspark.archive.mirror=https://archive.apache.org/dist/spark/spark-3.2.4
-Dspark.archive.name=spark-3.2.4-bin-hadoop3.2.tgz -Pzookeeper-3.6'
- exclude-tags:
'-Dmaven.plugin.scalatest.exclude.tags=org.scalatest.tags.Slow,org.apache.kyuubi.tags.DeltaTest,org.apache.kyuubi.tags.IcebergTest'
+ exclude-tags:
'-Dmaven.plugin.scalatest.exclude.tags=org.scalatest.tags.Slow,org.apache.kyuubi.tags.DeltaTest,org.apache.kyuubi.tags.IcebergTest,org.apache.kyuubi.tags.SparkLocalClusterTest'
comment: 'verify-on-spark-3.2-binary'
- java: 8
spark: '3.3'
spark-archive:
'-Dspark.archive.mirror=https://archive.apache.org/dist/spark/spark-3.4.0
-Dspark.archive.name=spark-3.4.0-bin-hadoop3.tgz -Pzookeeper-3.6'
- exclude-tags:
'-Dmaven.plugin.scalatest.exclude.tags=org.scalatest.tags.Slow,org.apache.kyuubi.tags.DeltaTest,org.apache.kyuubi.tags.IcebergTest'
+ exclude-tags:
'-Dmaven.plugin.scalatest.exclude.tags=org.scalatest.tags.Slow,org.apache.kyuubi.tags.DeltaTest,org.apache.kyuubi.tags.IcebergTest,org.apache.kyuubi.tags.SparkLocalClusterTest'
comment: 'verify-on-spark-3.4-binary'
exclude:
# SPARK-33772: Spark supports JDK 17 since 3.3.0
diff --git a/extensions/spark/kyuubi-extension-spark-3-3/pom.xml
b/extensions/spark/kyuubi-extension-spark-3-3/pom.xml
index 98c1cca02..51d21f684 100644
--- a/extensions/spark/kyuubi-extension-spark-3-3/pom.xml
+++ b/extensions/spark/kyuubi-extension-spark-3-3/pom.xml
@@ -37,6 +37,14 @@
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.kyuubi</groupId>
+ <artifactId>kyuubi-download</artifactId>
+ <version>${project.version}</version>
+ <type>pom</type>
+ <scope>test</scope>
+ </dependency>
+
<dependency>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-extension-spark-common_${scala.binary.version}</artifactId>
@@ -45,6 +53,14 @@
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.kyuubi</groupId>
+ <artifactId>kyuubi-util-scala_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
@@ -130,6 +146,38 @@
<build>
<plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>regex-property</id>
+ <goals>
+ <goal>regex-property</goal>
+ </goals>
+ <configuration>
+ <name>spark.home</name>
+
<value>${project.basedir}/../../../externals/kyuubi-download/target/${spark.archive.name}</value>
+ <regex>(.+)\.tgz</regex>
+ <replacement>$1</replacement>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest-maven-plugin</artifactId>
+ <configuration>
+ <environmentVariables>
+ <!--
+ Some tests run Spark in local-cluster mode.
+ This mode uses SPARK_HOME and SPARK_SCALA_VERSION to
build command to launch a Spark Standalone Cluster.
+ -->
+ <SPARK_HOME>${spark.home}</SPARK_HOME>
+
<SPARK_SCALA_VERSION>${scala.binary.version}</SPARK_SCALA_VERSION>
+ </environmentVariables>
+ </configuration>
+ </plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
diff --git
a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
index 5d3464228..792315d89 100644
---
a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
+++
b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
@@ -40,7 +40,7 @@ class KyuubiSparkSQLExtension extends (SparkSessionExtensions
=> Unit) {
extensions.injectOptimizerRule(ForcedMaxOutputRowsRule)
extensions.injectPlannerStrategy(MaxScanStrategy)
- extensions.injectQueryStagePrepRule(FinalStageResourceManager)
+ extensions.injectQueryStagePrepRule(FinalStageResourceManager(_))
extensions.injectQueryStagePrepRule(InjectCustomResourceProfile)
}
}
diff --git
a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/spark/sql/FinalStageResourceManager.scala
b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/spark/sql/FinalStageResourceManager.scala
index dc573f838..32fb9f5ce 100644
---
a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/spark/sql/FinalStageResourceManager.scala
+++
b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/spark/sql/FinalStageResourceManager.scala
@@ -22,6 +22,8 @@ import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.{ExecutorAllocationClient, MapOutputTrackerMaster,
SparkContext, SparkEnv}
+import org.apache.spark.internal.Logging
+import org.apache.spark.resource.ResourceProfile
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SortExec,
SparkPlan}
@@ -185,7 +187,12 @@ case class FinalStageResourceManager(session: SparkSession)
numReduce: Int): Unit = {
val executorAllocationClient =
sc.schedulerBackend.asInstanceOf[ExecutorAllocationClient]
- val executorsToKill = findExecutorToKill(sc, targetExecutors, shuffleId,
numReduce)
+ val executorsToKill =
+ if
(conf.getConf(KyuubiSQLConf.FINAL_WRITE_STAGE_EAGERLY_KILL_EXECUTORS_KILL_ALL))
{
+ executorAllocationClient.getExecutorIds()
+ } else {
+ findExecutorToKill(sc, targetExecutors, shuffleId, numReduce)
+ }
logInfo(s"Request to kill executors, total count ${executorsToKill.size},
" +
s"[${executorsToKill.mkString(", ")}].")
if (executorsToKill.isEmpty) {
@@ -210,6 +217,14 @@ case class FinalStageResourceManager(session: SparkSession)
adjustTargetNumExecutors = true,
countFailures = false,
force = false)
+
+ FinalStageResourceManager.getAdjustedTargetExecutors(sc)
+ .filter(_ < targetExecutors).foreach { adjustedExecutors =>
+ val delta = targetExecutors - adjustedExecutors
+ logInfo(s"Target executors after kill ($adjustedExecutors) is lower
than required " +
+ s"($targetExecutors). Requesting $delta additional executor(s).")
+ executorAllocationClient.requestExecutors(delta)
+ }
}
@transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq(
@@ -218,6 +233,31 @@ case class FinalStageResourceManager(session: SparkSession)
OptimizeShuffleWithLocalRead)
}
+object FinalStageResourceManager extends Logging {
+
+ private[sql] def getAdjustedTargetExecutors(sc: SparkContext): Option[Int] =
{
+ sc.schedulerBackend match {
+ case schedulerBackend: CoarseGrainedSchedulerBackend =>
+ try {
+ val field = classOf[CoarseGrainedSchedulerBackend]
+ .getDeclaredField("requestedTotalExecutorsPerResourceProfile")
+ field.setAccessible(true)
+ schedulerBackend.synchronized {
+ val requestedTotalExecutorsPerResourceProfile =
+
field.get(schedulerBackend).asInstanceOf[mutable.HashMap[ResourceProfile, Int]]
+ val defaultRp = sc.resourceProfileManager.defaultResourceProfile
+ requestedTotalExecutorsPerResourceProfile.get(defaultRp)
+ }
+ } catch {
+ case e: Exception =>
+ logWarning("Failed to get requestedTotalExecutors of Default
ResourceProfile", e)
+ None
+ }
+ case _ => None
+ }
+ }
+}
+
trait FinalRebalanceStageHelper extends AdaptiveSparkPlanHelper {
@tailrec
final protected def findFinalRebalanceStage(plan: SparkPlan):
Option[ShuffleQueryStageExec] = {
diff --git
a/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/FinalStageResourceManagerSuite.scala
b/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/FinalStageResourceManagerSuite.scala
new file mode 100644
index 000000000..4b9991ef6
--- /dev/null
+++
b/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/FinalStageResourceManagerSuite.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.SparkConf
+import org.scalatest.time.{Minutes, Span}
+
+import org.apache.kyuubi.sql.KyuubiSQLConf
+import org.apache.kyuubi.tags.SparkLocalClusterTest
+
+@SparkLocalClusterTest
+class FinalStageResourceManagerSuite extends KyuubiSparkSQLExtensionTest {
+
+ override def sparkConf(): SparkConf = {
+ // It is difficult to run spark in local-cluster mode when spark.testing
is set.
+ sys.props.remove("spark.testing")
+
+ super.sparkConf().set("spark.master", "local-cluster[3, 1, 1024]")
+ .set("spark.dynamicAllocation.enabled", "true")
+ .set("spark.dynamicAllocation.initialExecutors", "3")
+ .set("spark.dynamicAllocation.minExecutors", "1")
+ .set("spark.dynamicAllocation.shuffleTracking.enabled", "true")
+ .set(KyuubiSQLConf.FINAL_STAGE_CONFIG_ISOLATION.key, "true")
+ .set(KyuubiSQLConf.FINAL_WRITE_STAGE_EAGERLY_KILL_EXECUTORS_ENABLED.key,
"true")
+ }
+
+ test("[KYUUBI #5136][Bug] Final Stage hangs forever") {
+ // Prerequisite to reproduce the bug:
+ // 1. Dynamic allocation is enabled.
+ // 2. Dynamic allocation min executors is 1.
+ // 3. target executors < active executors.
+ // 4. No active executor is left after FinalStageResourceManager killed
executors.
+ // This is possible because FinalStageResourceManager retained
executors may already be
+ // requested to be killed but not died yet.
+ // 5. Final Stage required executors is 1.
+ withSQLConf(
+ (KyuubiSQLConf.FINAL_WRITE_STAGE_EAGERLY_KILL_EXECUTORS_KILL_ALL.key,
"true")) {
+ withTable("final_stage") {
+ eventually(timeout(Span(10, Minutes))) {
+ sql(
+ "CREATE TABLE final_stage AS SELECT id, count(*) as num FROM
(SELECT 0 id) GROUP BY id")
+ }
+
assert(FinalStageResourceManager.getAdjustedTargetExecutors(spark.sparkContext).get
== 1)
+ }
+ }
+ }
+}
diff --git a/extensions/spark/kyuubi-extension-spark-3-4/pom.xml
b/extensions/spark/kyuubi-extension-spark-3-4/pom.xml
index 947c03ea0..20db5d12f 100644
--- a/extensions/spark/kyuubi-extension-spark-3-4/pom.xml
+++ b/extensions/spark/kyuubi-extension-spark-3-4/pom.xml
@@ -55,6 +55,22 @@
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.kyuubi</groupId>
+ <artifactId>kyuubi-download</artifactId>
+ <version>${project.version}</version>
+ <type>pom</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.kyuubi</groupId>
+ <artifactId>kyuubi-util-scala_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
@@ -111,11 +127,49 @@
<artifactId>jakarta.xml.bind-api</artifactId>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-slf4j-impl</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
<plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>regex-property</id>
+ <goals>
+ <goal>regex-property</goal>
+ </goals>
+ <configuration>
+ <name>spark.home</name>
+
<value>${project.basedir}/../../../externals/kyuubi-download/target/${spark.archive.name}</value>
+ <regex>(.+)\.tgz</regex>
+ <replacement>$1</replacement>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest-maven-plugin</artifactId>
+ <configuration>
+ <environmentVariables>
+ <!--
+ Some tests run Spark in local-cluster mode.
+ This mode uses SPARK_HOME and SPARK_SCALA_VERSION to
build command to launch a Spark Standalone Cluster.
+ -->
+ <SPARK_HOME>${spark.home}</SPARK_HOME>
+
<SPARK_SCALA_VERSION>${scala.binary.version}</SPARK_SCALA_VERSION>
+ </environmentVariables>
+ </configuration>
+ </plugin>
<plugin>
<groupId>org.antlr</groupId>
<artifactId>antlr4-maven-plugin</artifactId>
diff --git
a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
index fa118a3e2..6f45dae12 100644
---
a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
+++
b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
@@ -210,6 +210,14 @@ object KyuubiSQLConf {
.booleanConf
.createWithDefault(false)
+ val FINAL_WRITE_STAGE_EAGERLY_KILL_EXECUTORS_KILL_ALL =
+ buildConf("spark.sql.finalWriteStage.eagerlyKillExecutors.killAll")
+ .doc("When true, eagerly kill all executors before running final write
stage. " +
+ "Mainly for test.")
+ .version("1.8.0")
+ .booleanConf
+ .createWithDefault(false)
+
val FINAL_WRITE_STAGE_SKIP_KILLING_EXECUTORS_FOR_TABLE_CACHE =
buildConf("spark.sql.finalWriteStage.skipKillingExecutorsForTableCache")
.doc("When true, skip killing executors if the plan has table caches.")
diff --git
a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
index 5d3464228..792315d89 100644
---
a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
+++
b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
@@ -40,7 +40,7 @@ class KyuubiSparkSQLExtension extends (SparkSessionExtensions
=> Unit) {
extensions.injectOptimizerRule(ForcedMaxOutputRowsRule)
extensions.injectPlannerStrategy(MaxScanStrategy)
- extensions.injectQueryStagePrepRule(FinalStageResourceManager)
+ extensions.injectQueryStagePrepRule(FinalStageResourceManager(_))
extensions.injectQueryStagePrepRule(InjectCustomResourceProfile)
}
}
diff --git
a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/spark/sql/FinalStageResourceManager.scala
b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/spark/sql/FinalStageResourceManager.scala
index 16002dfa0..81873476c 100644
---
a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/spark/sql/FinalStageResourceManager.scala
+++
b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/spark/sql/FinalStageResourceManager.scala
@@ -22,6 +22,8 @@ import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.{ExecutorAllocationClient, MapOutputTrackerMaster,
SparkContext, SparkEnv}
+import org.apache.spark.internal.Logging
+import org.apache.spark.resource.ResourceProfile
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SortExec,
SparkPlan}
@@ -188,7 +190,12 @@ case class FinalStageResourceManager(session: SparkSession)
numReduce: Int): Unit = {
val executorAllocationClient =
sc.schedulerBackend.asInstanceOf[ExecutorAllocationClient]
- val executorsToKill = findExecutorToKill(sc, targetExecutors, shuffleId,
numReduce)
+ val executorsToKill =
+ if
(conf.getConf(KyuubiSQLConf.FINAL_WRITE_STAGE_EAGERLY_KILL_EXECUTORS_KILL_ALL))
{
+ executorAllocationClient.getExecutorIds()
+ } else {
+ findExecutorToKill(sc, targetExecutors, shuffleId, numReduce)
+ }
logInfo(s"Request to kill executors, total count ${executorsToKill.size},
" +
s"[${executorsToKill.mkString(", ")}].")
if (executorsToKill.isEmpty) {
@@ -213,6 +220,14 @@ case class FinalStageResourceManager(session: SparkSession)
adjustTargetNumExecutors = true,
countFailures = false,
force = false)
+
+ FinalStageResourceManager.getAdjustedTargetExecutors(sc)
+ .filter(_ < targetExecutors).foreach { adjustedExecutors =>
+ val delta = targetExecutors - adjustedExecutors
+ logInfo(s"Target executors after kill ($adjustedExecutors) is lower
than required " +
+ s"($targetExecutors). Requesting $delta additional executor(s).")
+ executorAllocationClient.requestExecutors(delta)
+ }
}
@transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq(
@@ -221,6 +236,31 @@ case class FinalStageResourceManager(session: SparkSession)
OptimizeShuffleWithLocalRead)
}
+object FinalStageResourceManager extends Logging {
+
+ private[sql] def getAdjustedTargetExecutors(sc: SparkContext): Option[Int] =
{
+ sc.schedulerBackend match {
+ case schedulerBackend: CoarseGrainedSchedulerBackend =>
+ try {
+ val field = classOf[CoarseGrainedSchedulerBackend]
+ .getDeclaredField("requestedTotalExecutorsPerResourceProfile")
+ field.setAccessible(true)
+ schedulerBackend.synchronized {
+ val requestedTotalExecutorsPerResourceProfile =
+
field.get(schedulerBackend).asInstanceOf[mutable.HashMap[ResourceProfile, Int]]
+ val defaultRp = sc.resourceProfileManager.defaultResourceProfile
+ requestedTotalExecutorsPerResourceProfile.get(defaultRp)
+ }
+ } catch {
+ case e: Exception =>
+ logWarning("Failed to get requestedTotalExecutors of Default
ResourceProfile", e)
+ None
+ }
+ case _ => None
+ }
+ }
+}
+
trait FinalRebalanceStageHelper extends AdaptiveSparkPlanHelper {
@tailrec
final protected def findFinalRebalanceStage(plan: SparkPlan):
Option[ShuffleQueryStageExec] = {
diff --git
a/extensions/spark/kyuubi-extension-spark-3-4/src/test/scala/org/apache/spark/sql/FinalStageResourceManagerSuite.scala
b/extensions/spark/kyuubi-extension-spark-3-4/src/test/scala/org/apache/spark/sql/FinalStageResourceManagerSuite.scala
new file mode 100644
index 000000000..4b9991ef6
--- /dev/null
+++
b/extensions/spark/kyuubi-extension-spark-3-4/src/test/scala/org/apache/spark/sql/FinalStageResourceManagerSuite.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.SparkConf
+import org.scalatest.time.{Minutes, Span}
+
+import org.apache.kyuubi.sql.KyuubiSQLConf
+import org.apache.kyuubi.tags.SparkLocalClusterTest
+
+@SparkLocalClusterTest
+class FinalStageResourceManagerSuite extends KyuubiSparkSQLExtensionTest {
+
+ override def sparkConf(): SparkConf = {
+ // It is difficult to run spark in local-cluster mode when spark.testing
is set.
+ sys.props.remove("spark.testing")
+
+ super.sparkConf().set("spark.master", "local-cluster[3, 1, 1024]")
+ .set("spark.dynamicAllocation.enabled", "true")
+ .set("spark.dynamicAllocation.initialExecutors", "3")
+ .set("spark.dynamicAllocation.minExecutors", "1")
+ .set("spark.dynamicAllocation.shuffleTracking.enabled", "true")
+ .set(KyuubiSQLConf.FINAL_STAGE_CONFIG_ISOLATION.key, "true")
+ .set(KyuubiSQLConf.FINAL_WRITE_STAGE_EAGERLY_KILL_EXECUTORS_ENABLED.key,
"true")
+ }
+
+ test("[KYUUBI #5136][Bug] Final Stage hangs forever") {
+ // Prerequisite to reproduce the bug:
+ // 1. Dynamic allocation is enabled.
+ // 2. Dynamic allocation min executors is 1.
+ // 3. target executors < active executors.
+ // 4. No active executor is left after FinalStageResourceManager killed
executors.
+ // This is possible because FinalStageResourceManager retained
executors may already be
+ // requested to be killed but not died yet.
+ // 5. Final Stage required executors is 1.
+ withSQLConf(
+ (KyuubiSQLConf.FINAL_WRITE_STAGE_EAGERLY_KILL_EXECUTORS_KILL_ALL.key,
"true")) {
+ withTable("final_stage") {
+ eventually(timeout(Span(10, Minutes))) {
+ sql(
+ "CREATE TABLE final_stage AS SELECT id, count(*) as num FROM
(SELECT 0 id) GROUP BY id")
+ }
+
assert(FinalStageResourceManager.getAdjustedTargetExecutors(spark.sparkContext).get
== 1)
+ }
+ }
+ }
+}
diff --git
a/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
b/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
index fa118a3e2..6f45dae12 100644
---
a/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
+++
b/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
@@ -210,6 +210,14 @@ object KyuubiSQLConf {
.booleanConf
.createWithDefault(false)
+ val FINAL_WRITE_STAGE_EAGERLY_KILL_EXECUTORS_KILL_ALL =
+ buildConf("spark.sql.finalWriteStage.eagerlyKillExecutors.killAll")
+ .doc("When true, eagerly kill all executors before running final write
stage. " +
+ "Mainly for test.")
+ .version("1.8.0")
+ .booleanConf
+ .createWithDefault(false)
+
val FINAL_WRITE_STAGE_SKIP_KILLING_EXECUTORS_FOR_TABLE_CACHE =
buildConf("spark.sql.finalWriteStage.skipKillingExecutorsForTableCache")
.doc("When true, skip killing executors if the plan has table caches.")
diff --git
a/kyuubi-util-scala/src/test/java/org/apache/kyuubi/tags/SparkLocalClusterTest.java
b/kyuubi-util-scala/src/test/java/org/apache/kyuubi/tags/SparkLocalClusterTest.java
new file mode 100644
index 000000000..dd718f125
--- /dev/null
+++
b/kyuubi-util-scala/src/test/java/org/apache/kyuubi/tags/SparkLocalClusterTest.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.tags;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+import org.scalatest.TagAnnotation;
+
+@TagAnnotation
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.METHOD, ElementType.TYPE})
+public @interface SparkLocalClusterTest {}