This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new e2436e446 [KYUUBI #6302][FOLLOWUP] Skip spark job group cancellation 
on incremental collect mode
e2436e446 is described below

commit e2436e44607b8eb4842a8ca53e74a0f944e0cddf
Author: xorsum <[email protected]>
AuthorDate: Thu Jun 20 10:49:59 2024 +0800

    [KYUUBI #6302][FOLLOWUP] Skip spark job group cancellation on incremental 
collect mode
    
    # :mag: Description
    ## Issue References ๐Ÿ”—
    
    This pull request fixes 
https://github.com/apache/kyuubi/pull/6473#discussion_r1642652411
    
    ## Describe Your Solution ๐Ÿ”ง
    
    add a configuration to control whether to skip the cancellation here for 
incremental collect queries, skipping by default for safety.
    
    ## Types of changes :bookmark:
    
    - [x] Bugfix (non-breaking change which fixes an issue)
    - [ ] New feature (non-breaking change which adds functionality)
    - [ ] Breaking change (fix or feature that would cause existing 
functionality to change)
    
    ## Test Plan ๐Ÿงช
    
    #### Behavior Without This Pull Request :coffin:
    
    #### Behavior With This Pull Request :tada:
    
    #### Related Unit Tests
    
    ---
    
    # Checklist ๐Ÿ“
    
    - [x] This patch was not authored or co-authored using [Generative 
Tooling](https://www.apache.org/legal/generative-tooling.html)
    
    **Be nice. Be informative.**
    
    Closes #6482 from XorSum/features/skip-cancel-incremental.
    
    Closes #6302
    
    440311f07 [xorsum] reformat
    edbc37868 [bkhan] Update 
externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
    d6c99366c [xorsum] one line
    9f40405c7 [xorsum] update configuration
    b1526319e [xorsum] skip job group cancellation on incremental collect mode
    
    Lead-authored-by: xorsum <[email protected]>
    Co-authored-by: bkhan <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
---
 .../kyuubi/engine/spark/operation/ExecuteStatement.scala      |  9 +++++++--
 .../src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala  | 11 +++++++++++
 2 files changed, 18 insertions(+), 2 deletions(-)

diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
index d2f713124..ca60de38f 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.kyuubi.SparkDatasetHelper._
 import org.apache.spark.sql.types._
 
 import org.apache.kyuubi.{KyuubiSQLException, Logging}
-import org.apache.kyuubi.config.KyuubiConf.{OPERATION_RESULT_MAX_ROWS, 
OPERATION_RESULT_SAVE_TO_FILE, OPERATION_RESULT_SAVE_TO_FILE_MIN_ROWS, 
OPERATION_RESULT_SAVE_TO_FILE_MINSIZE}
+import 
org.apache.kyuubi.config.KyuubiConf.{ENGINE_SPARK_OPERATION_INCREMENTAL_COLLECT_CANCEL_JOB_GROUP,
 OPERATION_RESULT_MAX_ROWS, OPERATION_RESULT_SAVE_TO_FILE, 
OPERATION_RESULT_SAVE_TO_FILE_MIN_ROWS, OPERATION_RESULT_SAVE_TO_FILE_MINSIZE}
 import org.apache.kyuubi.engine.spark.KyuubiSparkUtil._
 import org.apache.kyuubi.engine.spark.session.{SparkSessionImpl, 
SparkSQLSessionManager}
 import org.apache.kyuubi.operation.{ArrayFetchIterator, FetchIterator, 
IterableFetchIterator, OperationHandle, OperationState}
@@ -104,7 +104,12 @@ class ExecuteStatement(
       onError(cancel = true)
     } finally {
       shutdownTimeoutMonitor()
-      if (!spark.sparkContext.isStopped) 
spark.sparkContext.cancelJobGroup(statementId)
+      if (!spark.sparkContext.isStopped) {
+        if (!incrementalCollect ||
+          
getSessionConf(ENGINE_SPARK_OPERATION_INCREMENTAL_COLLECT_CANCEL_JOB_GROUP, 
spark)) {
+          spark.sparkContext.cancelJobGroup(statementId)
+        }
+      }
     }
 
   override protected def runInternal(): Unit = {
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index 344806dc9..af60823d2 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -2761,6 +2761,17 @@ object KyuubiConf {
       .version("1.10.0")
       .fallbackConf(OPERATION_INCREMENTAL_COLLECT)
 
+  val ENGINE_SPARK_OPERATION_INCREMENTAL_COLLECT_CANCEL_JOB_GROUP: 
ConfigEntry[Boolean] =
+    buildConf(
+      
"kyuubi.engine.spark.operation.incremental.collect.cancelJobGroupAfterExecutionFinished")
+      .internal
+      .doc("Canceling jobs group that are still running after statement 
execution finished " +
+        "avoids wasting resources. But the cancellation may cause the query 
fail when using " +
+        "incremental collect mode.")
+      .version("1.10.0")
+      .booleanConf
+      .createWithDefault(false)
+
   val ENGINE_SESSION_SPARK_INITIALIZE_SQL: ConfigEntry[Seq[String]] =
     buildConf("kyuubi.session.engine.spark.initialize.sql")
       .doc("The initialize sql for Spark session. " +

Reply via email to