This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new d513f1f1e [KYUUBI #5136][Bug] Spark App may hang forever if 
FinalStageResourceManager killed all executors
d513f1f1e is described below

commit d513f1f1e60fc0ab53095a1f24cea5c354174891
Author: zhouyifan279 <[email protected]>
AuthorDate: Wed Aug 16 16:09:17 2023 +0800

    [KYUUBI #5136][Bug] Spark App may hang forever if FinalStageResourceManager 
killed all executors
    
    ### _Why are the changes needed?_
    In minor cases, Spark Stage hangs forever when 
spark.sql.finalWriteStage.eagerlyKillExecutors.enabled is true.
    
    The bug occurs if two conditions are met in the same time:
    
    1. All executors are either removed because of idle time out or killed by 
FinalStageResourceManager.
       Target executor num in YarnAllocator will be set to 0 and no more 
executor will be launched.
    2. Target executor num in ExecutorAllocationManager equals to the executor 
num needed by final stage.
       Then ExecutorAllocationManager will not sync target executor num to 
YarnAllocator.
    
    ### _How was this patch tested?_
    - [x] Add a new test suite `FinalStageResourceManagerSuite`
    
    Closes #5141 from zhouyifan279/adjust-executors.
    
    Closes #5136
    
    c4403eefa [zhouyifan279] assert adjustedTargetExecutors == 1
    ea8f24733 [zhouyifan279] Add comment
    5f3ca1d9c [zhouyifan279] [KYUUBI #5136][Bug] Spark App may hang forever if 
FinalStageResourceManager killed all executors
    12687eee7 [zhouyifan279] [KYUUBI #5136][Bug] Spark App may hang forever if 
FinalStageResourceManager killed all executors
    9dcbc780d [zhouyifan279] [KYUUBI #5136][Bug] Spark App may hang forever if 
FinalStageResourceManager killed all executors
    
    Authored-by: zhouyifan279 <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
---
 .github/workflows/master.yml                       |  6 +--
 .../spark/kyuubi-extension-spark-3-3/pom.xml       | 48 +++++++++++++++++
 .../kyuubi/sql/KyuubiSparkSQLExtension.scala       |  2 +-
 .../spark/sql/FinalStageResourceManager.scala      | 42 ++++++++++++++-
 .../spark/sql/FinalStageResourceManagerSuite.scala | 62 ++++++++++++++++++++++
 .../spark/kyuubi-extension-spark-3-4/pom.xml       | 54 +++++++++++++++++++
 .../org/apache/kyuubi/sql/KyuubiSQLConf.scala      |  8 +++
 .../kyuubi/sql/KyuubiSparkSQLExtension.scala       |  2 +-
 .../spark/sql/FinalStageResourceManager.scala      | 42 ++++++++++++++-
 .../spark/sql/FinalStageResourceManagerSuite.scala | 62 ++++++++++++++++++++++
 .../org/apache/kyuubi/sql/KyuubiSQLConf.scala      |  8 +++
 .../apache/kyuubi/tags/SparkLocalClusterTest.java  | 29 ++++++++++
 12 files changed, 358 insertions(+), 7 deletions(-)

diff --git a/.github/workflows/master.yml b/.github/workflows/master.yml
index 674c7d060..00e5772a2 100644
--- a/.github/workflows/master.yml
+++ b/.github/workflows/master.yml
@@ -59,17 +59,17 @@ jobs:
           - java: 8
             spark: '3.3'
             spark-archive: 
'-Dspark.archive.mirror=https://archive.apache.org/dist/spark/spark-3.1.3 
-Dspark.archive.name=spark-3.1.3-bin-hadoop3.2.tgz -Pzookeeper-3.6'
-            exclude-tags: 
'-Dmaven.plugin.scalatest.exclude.tags=org.scalatest.tags.Slow,org.apache.kyuubi.tags.DeltaTest,org.apache.kyuubi.tags.IcebergTest'
+            exclude-tags: 
'-Dmaven.plugin.scalatest.exclude.tags=org.scalatest.tags.Slow,org.apache.kyuubi.tags.DeltaTest,org.apache.kyuubi.tags.IcebergTest,org.apache.kyuubi.tags.SparkLocalClusterTest'
             comment: 'verify-on-spark-3.1-binary'
           - java: 8
             spark: '3.3'
             spark-archive: 
'-Dspark.archive.mirror=https://archive.apache.org/dist/spark/spark-3.2.4 
-Dspark.archive.name=spark-3.2.4-bin-hadoop3.2.tgz -Pzookeeper-3.6'
-            exclude-tags: 
'-Dmaven.plugin.scalatest.exclude.tags=org.scalatest.tags.Slow,org.apache.kyuubi.tags.DeltaTest,org.apache.kyuubi.tags.IcebergTest'
+            exclude-tags: 
'-Dmaven.plugin.scalatest.exclude.tags=org.scalatest.tags.Slow,org.apache.kyuubi.tags.DeltaTest,org.apache.kyuubi.tags.IcebergTest,org.apache.kyuubi.tags.SparkLocalClusterTest'
             comment: 'verify-on-spark-3.2-binary'
           - java: 8
             spark: '3.3'
             spark-archive: 
'-Dspark.archive.mirror=https://archive.apache.org/dist/spark/spark-3.4.0 
-Dspark.archive.name=spark-3.4.0-bin-hadoop3.tgz -Pzookeeper-3.6'
-            exclude-tags: 
'-Dmaven.plugin.scalatest.exclude.tags=org.scalatest.tags.Slow,org.apache.kyuubi.tags.DeltaTest,org.apache.kyuubi.tags.IcebergTest'
+            exclude-tags: 
'-Dmaven.plugin.scalatest.exclude.tags=org.scalatest.tags.Slow,org.apache.kyuubi.tags.DeltaTest,org.apache.kyuubi.tags.IcebergTest,org.apache.kyuubi.tags.SparkLocalClusterTest'
             comment: 'verify-on-spark-3.4-binary'
         exclude:
           # SPARK-33772: Spark supports JDK 17 since 3.3.0
diff --git a/extensions/spark/kyuubi-extension-spark-3-3/pom.xml 
b/extensions/spark/kyuubi-extension-spark-3-3/pom.xml
index 98c1cca02..51d21f684 100644
--- a/extensions/spark/kyuubi-extension-spark-3-3/pom.xml
+++ b/extensions/spark/kyuubi-extension-spark-3-3/pom.xml
@@ -37,6 +37,14 @@
             <version>${project.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.kyuubi</groupId>
+            <artifactId>kyuubi-download</artifactId>
+            <version>${project.version}</version>
+            <type>pom</type>
+            <scope>test</scope>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.kyuubi</groupId>
             
<artifactId>kyuubi-extension-spark-common_${scala.binary.version}</artifactId>
@@ -45,6 +53,14 @@
             <scope>test</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.kyuubi</groupId>
+            <artifactId>kyuubi-util-scala_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
         <dependency>
             <groupId>org.scala-lang</groupId>
             <artifactId>scala-library</artifactId>
@@ -130,6 +146,38 @@
     <build>
 
         <plugins>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>build-helper-maven-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>regex-property</id>
+                        <goals>
+                            <goal>regex-property</goal>
+                        </goals>
+                        <configuration>
+                            <name>spark.home</name>
+                            
<value>${project.basedir}/../../../externals/kyuubi-download/target/${spark.archive.name}</value>
+                            <regex>(.+)\.tgz</regex>
+                            <replacement>$1</replacement>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.scalatest</groupId>
+                <artifactId>scalatest-maven-plugin</artifactId>
+                <configuration>
+                    <environmentVariables>
+                        <!--
+                          Some tests run Spark in local-cluster mode.
+                          This mode uses SPARK_HOME and SPARK_SCALA_VERSION to 
build command to launch a Spark Standalone Cluster.
+                          -->
+                        <SPARK_HOME>${spark.home}</SPARK_HOME>
+                        
<SPARK_SCALA_VERSION>${scala.binary.version}</SPARK_SCALA_VERSION>
+                    </environmentVariables>
+                </configuration>
+            </plugin>
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-shade-plugin</artifactId>
diff --git 
a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
 
b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
index 5d3464228..792315d89 100644
--- 
a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
+++ 
b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
@@ -40,7 +40,7 @@ class KyuubiSparkSQLExtension extends (SparkSessionExtensions 
=> Unit) {
     extensions.injectOptimizerRule(ForcedMaxOutputRowsRule)
     extensions.injectPlannerStrategy(MaxScanStrategy)
 
-    extensions.injectQueryStagePrepRule(FinalStageResourceManager)
+    extensions.injectQueryStagePrepRule(FinalStageResourceManager(_))
     extensions.injectQueryStagePrepRule(InjectCustomResourceProfile)
   }
 }
diff --git 
a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/spark/sql/FinalStageResourceManager.scala
 
b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/spark/sql/FinalStageResourceManager.scala
index dc573f838..32fb9f5ce 100644
--- 
a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/spark/sql/FinalStageResourceManager.scala
+++ 
b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/spark/sql/FinalStageResourceManager.scala
@@ -22,6 +22,8 @@ import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.{ExecutorAllocationClient, MapOutputTrackerMaster, 
SparkContext, SparkEnv}
+import org.apache.spark.internal.Logging
+import org.apache.spark.resource.ResourceProfile
 import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SortExec, 
SparkPlan}
@@ -185,7 +187,12 @@ case class FinalStageResourceManager(session: SparkSession)
       numReduce: Int): Unit = {
     val executorAllocationClient = 
sc.schedulerBackend.asInstanceOf[ExecutorAllocationClient]
 
-    val executorsToKill = findExecutorToKill(sc, targetExecutors, shuffleId, 
numReduce)
+    val executorsToKill =
+      if 
(conf.getConf(KyuubiSQLConf.FINAL_WRITE_STAGE_EAGERLY_KILL_EXECUTORS_KILL_ALL)) 
{
+        executorAllocationClient.getExecutorIds()
+      } else {
+        findExecutorToKill(sc, targetExecutors, shuffleId, numReduce)
+      }
     logInfo(s"Request to kill executors, total count ${executorsToKill.size}, 
" +
       s"[${executorsToKill.mkString(", ")}].")
     if (executorsToKill.isEmpty) {
@@ -210,6 +217,14 @@ case class FinalStageResourceManager(session: SparkSession)
       adjustTargetNumExecutors = true,
       countFailures = false,
       force = false)
+
+    FinalStageResourceManager.getAdjustedTargetExecutors(sc)
+      .filter(_ < targetExecutors).foreach { adjustedExecutors =>
+        val delta = targetExecutors - adjustedExecutors
+        logInfo(s"Target executors after kill ($adjustedExecutors) is lower 
than required " +
+          s"($targetExecutors). Requesting $delta additional executor(s).")
+        executorAllocationClient.requestExecutors(delta)
+      }
   }
 
   @transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq(
@@ -218,6 +233,31 @@ case class FinalStageResourceManager(session: SparkSession)
     OptimizeShuffleWithLocalRead)
 }
 
+object FinalStageResourceManager extends Logging {
+
+  private[sql] def getAdjustedTargetExecutors(sc: SparkContext): Option[Int] = 
{
+    sc.schedulerBackend match {
+      case schedulerBackend: CoarseGrainedSchedulerBackend =>
+        try {
+          val field = classOf[CoarseGrainedSchedulerBackend]
+            .getDeclaredField("requestedTotalExecutorsPerResourceProfile")
+          field.setAccessible(true)
+          schedulerBackend.synchronized {
+            val requestedTotalExecutorsPerResourceProfile =
+              
field.get(schedulerBackend).asInstanceOf[mutable.HashMap[ResourceProfile, Int]]
+            val defaultRp = sc.resourceProfileManager.defaultResourceProfile
+            requestedTotalExecutorsPerResourceProfile.get(defaultRp)
+          }
+        } catch {
+          case e: Exception =>
+            logWarning("Failed to get requestedTotalExecutors of Default 
ResourceProfile", e)
+            None
+        }
+      case _ => None
+    }
+  }
+}
+
 trait FinalRebalanceStageHelper extends AdaptiveSparkPlanHelper {
   @tailrec
   final protected def findFinalRebalanceStage(plan: SparkPlan): 
Option[ShuffleQueryStageExec] = {
diff --git 
a/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/FinalStageResourceManagerSuite.scala
 
b/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/FinalStageResourceManagerSuite.scala
new file mode 100644
index 000000000..4b9991ef6
--- /dev/null
+++ 
b/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/FinalStageResourceManagerSuite.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.SparkConf
+import org.scalatest.time.{Minutes, Span}
+
+import org.apache.kyuubi.sql.KyuubiSQLConf
+import org.apache.kyuubi.tags.SparkLocalClusterTest
+
+@SparkLocalClusterTest
+class FinalStageResourceManagerSuite extends KyuubiSparkSQLExtensionTest {
+
+  override def sparkConf(): SparkConf = {
+    // It is difficult to run spark in local-cluster mode when spark.testing 
is set.
+    sys.props.remove("spark.testing")
+
+    super.sparkConf().set("spark.master", "local-cluster[3, 1, 1024]")
+      .set("spark.dynamicAllocation.enabled", "true")
+      .set("spark.dynamicAllocation.initialExecutors", "3")
+      .set("spark.dynamicAllocation.minExecutors", "1")
+      .set("spark.dynamicAllocation.shuffleTracking.enabled", "true")
+      .set(KyuubiSQLConf.FINAL_STAGE_CONFIG_ISOLATION.key, "true")
+      .set(KyuubiSQLConf.FINAL_WRITE_STAGE_EAGERLY_KILL_EXECUTORS_ENABLED.key, 
"true")
+  }
+
+  test("[KYUUBI #5136][Bug] Final Stage hangs forever") {
+    // Prerequisite to reproduce the bug:
+    // 1. Dynamic allocation is enabled.
+    // 2. Dynamic allocation min executors is 1.
+    // 3. target executors < active executors.
+    // 4. No active executor is left after FinalStageResourceManager killed 
executors.
+    //    This is possible because FinalStageResourceManager retained 
executors may already be
+    //    requested to be killed but not died yet.
+    // 5. Final Stage required executors is 1.
+    withSQLConf(
+      (KyuubiSQLConf.FINAL_WRITE_STAGE_EAGERLY_KILL_EXECUTORS_KILL_ALL.key, 
"true")) {
+      withTable("final_stage") {
+        eventually(timeout(Span(10, Minutes))) {
+          sql(
+            "CREATE TABLE final_stage AS SELECT id, count(*) as num FROM 
(SELECT 0 id) GROUP BY id")
+        }
+        
assert(FinalStageResourceManager.getAdjustedTargetExecutors(spark.sparkContext).get
 == 1)
+      }
+    }
+  }
+}
diff --git a/extensions/spark/kyuubi-extension-spark-3-4/pom.xml 
b/extensions/spark/kyuubi-extension-spark-3-4/pom.xml
index 947c03ea0..20db5d12f 100644
--- a/extensions/spark/kyuubi-extension-spark-3-4/pom.xml
+++ b/extensions/spark/kyuubi-extension-spark-3-4/pom.xml
@@ -55,6 +55,22 @@
             <scope>provided</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.kyuubi</groupId>
+            <artifactId>kyuubi-download</artifactId>
+            <version>${project.version}</version>
+            <type>pom</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.kyuubi</groupId>
+            <artifactId>kyuubi-util-scala_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.spark</groupId>
             <artifactId>spark-core_${scala.binary.version}</artifactId>
@@ -111,11 +127,49 @@
             <artifactId>jakarta.xml.bind-api</artifactId>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-slf4j-impl</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
 
         <plugins>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>build-helper-maven-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>regex-property</id>
+                        <goals>
+                            <goal>regex-property</goal>
+                        </goals>
+                        <configuration>
+                            <name>spark.home</name>
+                            
<value>${project.basedir}/../../../externals/kyuubi-download/target/${spark.archive.name}</value>
+                            <regex>(.+)\.tgz</regex>
+                            <replacement>$1</replacement>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.scalatest</groupId>
+                <artifactId>scalatest-maven-plugin</artifactId>
+                <configuration>
+                    <environmentVariables>
+                        <!--
+                        Some tests run Spark in local-cluster mode.
+                        This mode uses SPARK_HOME and SPARK_SCALA_VERSION to 
build command to launch a Spark Standalone Cluster.
+                        -->
+                        <SPARK_HOME>${spark.home}</SPARK_HOME>
+                        
<SPARK_SCALA_VERSION>${scala.binary.version}</SPARK_SCALA_VERSION>
+                    </environmentVariables>
+                </configuration>
+            </plugin>
             <plugin>
                 <groupId>org.antlr</groupId>
                 <artifactId>antlr4-maven-plugin</artifactId>
diff --git 
a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
 
b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
index fa118a3e2..6f45dae12 100644
--- 
a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
+++ 
b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
@@ -210,6 +210,14 @@ object KyuubiSQLConf {
       .booleanConf
       .createWithDefault(false)
 
+  val FINAL_WRITE_STAGE_EAGERLY_KILL_EXECUTORS_KILL_ALL =
+    buildConf("spark.sql.finalWriteStage.eagerlyKillExecutors.killAll")
+      .doc("When true, eagerly kill all executors before running final write 
stage. " +
+        "Mainly for test.")
+      .version("1.8.0")
+      .booleanConf
+      .createWithDefault(false)
+
   val FINAL_WRITE_STAGE_SKIP_KILLING_EXECUTORS_FOR_TABLE_CACHE =
     buildConf("spark.sql.finalWriteStage.skipKillingExecutorsForTableCache")
       .doc("When true, skip killing executors if the plan has table caches.")
diff --git 
a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
 
b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
index 5d3464228..792315d89 100644
--- 
a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
+++ 
b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
@@ -40,7 +40,7 @@ class KyuubiSparkSQLExtension extends (SparkSessionExtensions 
=> Unit) {
     extensions.injectOptimizerRule(ForcedMaxOutputRowsRule)
     extensions.injectPlannerStrategy(MaxScanStrategy)
 
-    extensions.injectQueryStagePrepRule(FinalStageResourceManager)
+    extensions.injectQueryStagePrepRule(FinalStageResourceManager(_))
     extensions.injectQueryStagePrepRule(InjectCustomResourceProfile)
   }
 }
diff --git 
a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/spark/sql/FinalStageResourceManager.scala
 
b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/spark/sql/FinalStageResourceManager.scala
index 16002dfa0..81873476c 100644
--- 
a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/spark/sql/FinalStageResourceManager.scala
+++ 
b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/spark/sql/FinalStageResourceManager.scala
@@ -22,6 +22,8 @@ import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.{ExecutorAllocationClient, MapOutputTrackerMaster, 
SparkContext, SparkEnv}
+import org.apache.spark.internal.Logging
+import org.apache.spark.resource.ResourceProfile
 import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SortExec, 
SparkPlan}
@@ -188,7 +190,12 @@ case class FinalStageResourceManager(session: SparkSession)
       numReduce: Int): Unit = {
     val executorAllocationClient = 
sc.schedulerBackend.asInstanceOf[ExecutorAllocationClient]
 
-    val executorsToKill = findExecutorToKill(sc, targetExecutors, shuffleId, 
numReduce)
+    val executorsToKill =
+      if 
(conf.getConf(KyuubiSQLConf.FINAL_WRITE_STAGE_EAGERLY_KILL_EXECUTORS_KILL_ALL)) 
{
+        executorAllocationClient.getExecutorIds()
+      } else {
+        findExecutorToKill(sc, targetExecutors, shuffleId, numReduce)
+      }
     logInfo(s"Request to kill executors, total count ${executorsToKill.size}, 
" +
       s"[${executorsToKill.mkString(", ")}].")
     if (executorsToKill.isEmpty) {
@@ -213,6 +220,14 @@ case class FinalStageResourceManager(session: SparkSession)
       adjustTargetNumExecutors = true,
       countFailures = false,
       force = false)
+
+    FinalStageResourceManager.getAdjustedTargetExecutors(sc)
+      .filter(_ < targetExecutors).foreach { adjustedExecutors =>
+        val delta = targetExecutors - adjustedExecutors
+        logInfo(s"Target executors after kill ($adjustedExecutors) is lower 
than required " +
+          s"($targetExecutors). Requesting $delta additional executor(s).")
+        executorAllocationClient.requestExecutors(delta)
+      }
   }
 
   @transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq(
@@ -221,6 +236,31 @@ case class FinalStageResourceManager(session: SparkSession)
     OptimizeShuffleWithLocalRead)
 }
 
+object FinalStageResourceManager extends Logging {
+
+  private[sql] def getAdjustedTargetExecutors(sc: SparkContext): Option[Int] = 
{
+    sc.schedulerBackend match {
+      case schedulerBackend: CoarseGrainedSchedulerBackend =>
+        try {
+          val field = classOf[CoarseGrainedSchedulerBackend]
+            .getDeclaredField("requestedTotalExecutorsPerResourceProfile")
+          field.setAccessible(true)
+          schedulerBackend.synchronized {
+            val requestedTotalExecutorsPerResourceProfile =
+              
field.get(schedulerBackend).asInstanceOf[mutable.HashMap[ResourceProfile, Int]]
+            val defaultRp = sc.resourceProfileManager.defaultResourceProfile
+            requestedTotalExecutorsPerResourceProfile.get(defaultRp)
+          }
+        } catch {
+          case e: Exception =>
+            logWarning("Failed to get requestedTotalExecutors of Default 
ResourceProfile", e)
+            None
+        }
+      case _ => None
+    }
+  }
+}
+
 trait FinalRebalanceStageHelper extends AdaptiveSparkPlanHelper {
   @tailrec
   final protected def findFinalRebalanceStage(plan: SparkPlan): 
Option[ShuffleQueryStageExec] = {
diff --git 
a/extensions/spark/kyuubi-extension-spark-3-4/src/test/scala/org/apache/spark/sql/FinalStageResourceManagerSuite.scala
 
b/extensions/spark/kyuubi-extension-spark-3-4/src/test/scala/org/apache/spark/sql/FinalStageResourceManagerSuite.scala
new file mode 100644
index 000000000..4b9991ef6
--- /dev/null
+++ 
b/extensions/spark/kyuubi-extension-spark-3-4/src/test/scala/org/apache/spark/sql/FinalStageResourceManagerSuite.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.SparkConf
+import org.scalatest.time.{Minutes, Span}
+
+import org.apache.kyuubi.sql.KyuubiSQLConf
+import org.apache.kyuubi.tags.SparkLocalClusterTest
+
+@SparkLocalClusterTest
+class FinalStageResourceManagerSuite extends KyuubiSparkSQLExtensionTest {
+
+  override def sparkConf(): SparkConf = {
+    // It is difficult to run spark in local-cluster mode when spark.testing 
is set.
+    sys.props.remove("spark.testing")
+
+    super.sparkConf().set("spark.master", "local-cluster[3, 1, 1024]")
+      .set("spark.dynamicAllocation.enabled", "true")
+      .set("spark.dynamicAllocation.initialExecutors", "3")
+      .set("spark.dynamicAllocation.minExecutors", "1")
+      .set("spark.dynamicAllocation.shuffleTracking.enabled", "true")
+      .set(KyuubiSQLConf.FINAL_STAGE_CONFIG_ISOLATION.key, "true")
+      .set(KyuubiSQLConf.FINAL_WRITE_STAGE_EAGERLY_KILL_EXECUTORS_ENABLED.key, 
"true")
+  }
+
+  test("[KYUUBI #5136][Bug] Final Stage hangs forever") {
+    // Prerequisite to reproduce the bug:
+    // 1. Dynamic allocation is enabled.
+    // 2. Dynamic allocation min executors is 1.
+    // 3. target executors < active executors.
+    // 4. No active executor is left after FinalStageResourceManager killed 
executors.
+    //    This is possible because FinalStageResourceManager retained 
executors may already be
+    //    requested to be killed but not died yet.
+    // 5. Final Stage required executors is 1.
+    withSQLConf(
+      (KyuubiSQLConf.FINAL_WRITE_STAGE_EAGERLY_KILL_EXECUTORS_KILL_ALL.key, 
"true")) {
+      withTable("final_stage") {
+        eventually(timeout(Span(10, Minutes))) {
+          sql(
+            "CREATE TABLE final_stage AS SELECT id, count(*) as num FROM 
(SELECT 0 id) GROUP BY id")
+        }
+        
assert(FinalStageResourceManager.getAdjustedTargetExecutors(spark.sparkContext).get
 == 1)
+      }
+    }
+  }
+}
diff --git 
a/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
 
b/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
index fa118a3e2..6f45dae12 100644
--- 
a/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
+++ 
b/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
@@ -210,6 +210,14 @@ object KyuubiSQLConf {
       .booleanConf
       .createWithDefault(false)
 
+  val FINAL_WRITE_STAGE_EAGERLY_KILL_EXECUTORS_KILL_ALL =
+    buildConf("spark.sql.finalWriteStage.eagerlyKillExecutors.killAll")
+      .doc("When true, eagerly kill all executors before running final write 
stage. " +
+        "Mainly for test.")
+      .version("1.8.0")
+      .booleanConf
+      .createWithDefault(false)
+
   val FINAL_WRITE_STAGE_SKIP_KILLING_EXECUTORS_FOR_TABLE_CACHE =
     buildConf("spark.sql.finalWriteStage.skipKillingExecutorsForTableCache")
       .doc("When true, skip killing executors if the plan has table caches.")
diff --git 
a/kyuubi-util-scala/src/test/java/org/apache/kyuubi/tags/SparkLocalClusterTest.java
 
b/kyuubi-util-scala/src/test/java/org/apache/kyuubi/tags/SparkLocalClusterTest.java
new file mode 100644
index 000000000..dd718f125
--- /dev/null
+++ 
b/kyuubi-util-scala/src/test/java/org/apache/kyuubi/tags/SparkLocalClusterTest.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.tags;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+import org.scalatest.TagAnnotation;
+
+@TagAnnotation
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.METHOD, ElementType.TYPE})
+public @interface SparkLocalClusterTest {}

Reply via email to