This is an automated email from the ASF dual-hosted git repository.

ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 91a2ab366 [KYUUBI #4678] Improve FinalStageResourceManager kill 
executors
91a2ab366 is described below

commit 91a2ab3665f44ade8aa768a9bf125bcd8a71478f
Author: ulysses-you <[email protected]>
AuthorDate: Mon Apr 10 11:41:37 2023 +0800

    [KYUUBI #4678] Improve FinalStageResourceManager kill executors
    
    ### _Why are the changes needed?_
    
    This pr change two things:
    1. add a config to kill executors if the plan contains table caches. It's 
not always safe to kill executors if the cache is referenced by two write-like 
plan.
    2. force adjustTargetNumExecutors when killing executors. YarnAllocator` 
might re-request original target executors if DRA has not updated target 
executors yet. Note, DRA would re-adjust executors if there are more tasks to 
be executed, so we are safe. It's better to adjuest target num executor once we 
kill executors.
    
    ### _How was this patch tested?_
    These issues are found during my POC
    
    Closes #4678 from ulysses-you/skip-cache.
    
    Closes #4678
    
    b12620954 [ulysses-you] Improve kill executors
    
    Authored-by: ulysses-you <[email protected]>
    Signed-off-by: ulyssesyou <[email protected]>
---
 docs/extensions/engines/spark/rules.md             |  1 +
 .../spark/sql/FinalStageResourceManager.scala      | 28 ++++++++++++++++++++--
 .../org/apache/kyuubi/sql/KyuubiSQLConf.scala      |  7 ++++++
 3 files changed, 34 insertions(+), 2 deletions(-)

diff --git a/docs/extensions/engines/spark/rules.md 
b/docs/extensions/engines/spark/rules.md
index a4bda5d53..46e8dd3d1 100644
--- a/docs/extensions/engines/spark/rules.md
+++ b/docs/extensions/engines/spark/rules.md
@@ -84,6 +84,7 @@ Kyuubi provides some configs to make these feature easy to 
use.
 | spark.sql.optimizer.insertRepartitionBeforeWriteIfNoShuffle.enabled | false  
                                | When true, add repartition even if the 
original plan does not have shuffle.                                            
                                                                                
                                                                                
                                                              | 1.7.0 |
 | spark.sql.optimizer.finalStageConfigIsolationWriteOnly.enabled      | true   
                                | When true, only enable final stage isolation 
for writing.                                                                    
                                                                                
                                                                                
                                                        | 1.7.0 |
 | spark.sql.finalWriteStage.eagerlyKillExecutors.enabled              | false  
                                | When true, eagerly kill redundant executors 
before running final write stage.                                               
                                                                                
                                                                                
                                                         | 1.8.0 |
+| spark.sql.finalWriteStage.skipKillingExecutorsForTableCache         | true   
                                | When true, skip killing executors if the plan 
has table caches.                                                               
                                                                                
                                                                                
                                                       | 1.8.0 |
 | spark.sql.finalWriteStage.retainExecutorsFactor                     | 1.2    
                                | If the target executors * factor < active 
executors, and target executors * factor > min executors, then inject kill 
executors or inject custom resource profile.                                    
                                                                                
                                                                | 1.8.0 |
 | spark.sql.finalWriteStage.resourceIsolation.enabled                 | false  
                                | When true, make final write stage resource 
isolation using custom RDD resource profile.                                    
                                                                                
                                                                                
                                                          | 1.2.0 |
 | spark.sql.finalWriteStageExecutorCores                              | 
fallback spark.executor.cores          | Specify the executor core request for 
final write stage. It would be passed to the RDD resource profile.              
                                                                                
                                                                                
                                                               | 1.8.0 |
diff --git 
a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/spark/sql/FinalStageResourceManager.scala
 
b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/spark/sql/FinalStageResourceManager.scala
index ca3f762e1..7a0ae1592 100644
--- 
a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/spark/sql/FinalStageResourceManager.scala
+++ 
b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/spark/sql/FinalStageResourceManager.scala
@@ -26,6 +26,7 @@ import 
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SortExec, 
SparkPlan}
 import org.apache.spark.sql.execution.adaptive._
+import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
 import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, 
ShuffleExchangeExec}
 
 import org.apache.kyuubi.sql.{KyuubiSQLConf, MarkNumOutputColumnsRule}
@@ -69,6 +70,13 @@ case class FinalStageResourceManager(session: SparkSession)
       return plan
     }
 
+    // It's not safe to kill executors if this plan contains table cache.
+    // If the executor loses then the rdd would re-compute those partition.
+    if (hasTableCache(plan) &&
+      
conf.getConf(KyuubiSQLConf.FINAL_WRITE_STAGE_SKIP_KILLING_EXECUTORS_FOR_TABLE_CACHE))
 {
+      return plan
+    }
+
     // TODO: move this to query stage optimizer when updating Spark to 3.5.x
     // Since we are in `prepareQueryStage`, the AQE shuffle read has not been 
applied.
     // So we need to apply it by self.
@@ -188,9 +196,18 @@ case class FinalStageResourceManager(session: SparkSession)
     // see `https://github.com/apache/spark/pull/20604`.
     // It may cause the status in `ExecutorAllocationManager` inconsistent with
     // `CoarseGrainedSchedulerBackend` for a while. But it should be 
synchronous finally.
+    //
+    // We should adjust target num executors, otherwise `YarnAllocator` might 
re-request original
+    // target executors if DRA has not updated target executors yet.
+    // Note, DRA would re-adjust executors if there are more tasks to be 
executed, so we are safe.
+    //
+    //  * We kill executor
+    //      * YarnAllocator re-request target executors
+    //         * DRA can not release executors since they are new added
+    // ----------------------------------------------------------------> 
timeline
     executorAllocationClient.killExecutors(
       executorIds = executorsToKill,
-      adjustTargetNumExecutors = false,
+      adjustTargetNumExecutors = true,
       countFailures = false,
       force = false)
   }
@@ -201,7 +218,7 @@ case class FinalStageResourceManager(session: SparkSession)
     OptimizeShuffleWithLocalRead)
 }
 
-trait FinalRebalanceStageHelper {
+trait FinalRebalanceStageHelper extends AdaptiveSparkPlanHelper {
   @tailrec
   final protected def findFinalRebalanceStage(plan: SparkPlan): 
Option[ShuffleQueryStageExec] = {
     plan match {
@@ -216,4 +233,11 @@ trait FinalRebalanceStageHelper {
       case _ => None
     }
   }
+
+  final protected def hasTableCache(plan: SparkPlan): Boolean = {
+    find(plan) {
+      case _: InMemoryTableScanExec => true
+      case _ => false
+    }.isDefined
+  }
 }
diff --git 
a/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
 
b/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
index 4df924b51..aeee45869 100644
--- 
a/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
+++ 
b/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
@@ -198,6 +198,13 @@ object KyuubiSQLConf {
       .booleanConf
       .createWithDefault(false)
 
+  val FINAL_WRITE_STAGE_SKIP_KILLING_EXECUTORS_FOR_TABLE_CACHE =
+    buildConf("spark.sql.finalWriteStage.skipKillingExecutorsForTableCache")
+      .doc("When true, skip killing executors if the plan has table caches.")
+      .version("1.8.0")
+      .booleanConf
+      .createWithDefault(true)
+
   val FINAL_WRITE_STAGE_PARTITION_FACTOR =
     buildConf("spark.sql.finalWriteStage.retainExecutorsFactor")
       .doc("If the target executors * factor < active executors, and " +

Reply via email to