This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch branch-1.9
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/branch-1.9 by this push:
new e195ebbe3 [KYUUBI #6302][FOLLOWUP] Skip spark job group cancellation
on incremental collect mode
e195ebbe3 is described below
commit e195ebbe396ecff2913454ffa918373ce2f2a94d
Author: xorsum <[email protected]>
AuthorDate: Thu Jun 20 10:49:59 2024 +0800
[KYUUBI #6302][FOLLOWUP] Skip spark job group cancellation on incremental
collect mode
This pull request fixes
https://github.com/apache/kyuubi/pull/6473#discussion_r1642652411
add a configuration to control whether to skip the cancellation here for
incremental collect queries, skipping by default for safety.
- [x] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing
functionality to change)
---
- [x] This patch was not authored or co-authored using [Generative
Tooling](https://www.apache.org/legal/generative-tooling.html)
**Be nice. Be informative.**
Closes #6482 from XorSum/features/skip-cancel-incremental.
Closes #6302
440311f07 [xorsum] reformat
edbc37868 [bkhan] Update
externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
d6c99366c [xorsum] one line
9f40405c7 [xorsum] update configuration
b1526319e [xorsum] skip job group cancellation on incremental collect mode
Lead-authored-by: xorsum <[email protected]>
Co-authored-by: bkhan <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
(cherry picked from commit e2436e44607b8eb4842a8ca53e74a0f944e0cddf)
Signed-off-by: Cheng Pan <[email protected]>
---
.../kyuubi/engine/spark/operation/ExecuteStatement.scala | 9 +++++++--
.../src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala | 11 +++++++++++
2 files changed, 18 insertions(+), 2 deletions(-)
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
index 90bfe8336..4f1eb9b53 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.kyuubi.SparkDatasetHelper._
import org.apache.spark.sql.types._
import org.apache.kyuubi.{KyuubiSQLException, Logging}
-import org.apache.kyuubi.config.KyuubiConf.{OPERATION_RESULT_MAX_ROWS,
OPERATION_RESULT_SAVE_TO_FILE, OPERATION_RESULT_SAVE_TO_FILE_MIN_ROWS,
OPERATION_RESULT_SAVE_TO_FILE_MINSIZE}
+import
org.apache.kyuubi.config.KyuubiConf.{ENGINE_SPARK_OPERATION_INCREMENTAL_COLLECT_CANCEL_JOB_GROUP,
OPERATION_RESULT_MAX_ROWS, OPERATION_RESULT_SAVE_TO_FILE,
OPERATION_RESULT_SAVE_TO_FILE_MIN_ROWS, OPERATION_RESULT_SAVE_TO_FILE_MINSIZE}
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil._
import org.apache.kyuubi.engine.spark.session.{SparkSessionImpl,
SparkSQLSessionManager}
import org.apache.kyuubi.operation.{ArrayFetchIterator, FetchIterator,
IterableFetchIterator, OperationHandle, OperationState}
@@ -104,7 +104,12 @@ class ExecuteStatement(
onError(cancel = true)
} finally {
shutdownTimeoutMonitor()
- if (!spark.sparkContext.isStopped)
spark.sparkContext.cancelJobGroup(statementId)
+ if (!spark.sparkContext.isStopped) {
+ if (!incrementalCollect ||
+
getSessionConf(ENGINE_SPARK_OPERATION_INCREMENTAL_COLLECT_CANCEL_JOB_GROUP,
spark)) {
+ spark.sparkContext.cancelJobGroup(statementId)
+ }
+ }
}
override protected def runInternal(): Unit = {
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index 909b07802..4dbdfbae3 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -2660,6 +2660,17 @@ object KyuubiConf {
.stringConf
.createWithDefault("yyyy-MM-dd HH:mm:ss.SSS")
+ val ENGINE_SPARK_OPERATION_INCREMENTAL_COLLECT_CANCEL_JOB_GROUP:
ConfigEntry[Boolean] =
+ buildConf(
+
"kyuubi.engine.spark.operation.incremental.collect.cancelJobGroupAfterExecutionFinished")
+ .internal
+ .doc("Canceling jobs group that are still running after statement
execution finished " +
+ "avoids wasting resources. But the cancellation may cause the query
fail when using " +
+ "incremental collect mode.")
+ .version("1.9.2")
+ .booleanConf
+ .createWithDefault(false)
+
val ENGINE_SESSION_SPARK_INITIALIZE_SQL: ConfigEntry[Seq[String]] =
buildConf("kyuubi.session.engine.spark.initialize.sql")
.doc("The initialize sql for Spark session. " +