This is an automated email from the ASF dual-hosted git repository. chengpan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kyuubi.git
commit efdb67ff3991b92b05cce79c2cd42a6804b97b73 Author: xorsum <[email protected]> AuthorDate: Thu Jun 20 10:49:59 2024 +0800 [KYUUBI #6302][FOLLOWUP] Skip spark job group cancellation on incremental collect mode # :mag: Description ## Issue References ๐ This pull request fixes https://github.com/apache/kyuubi/pull/6473#discussion_r1642652411 ## Describe Your Solution ๐ง add a configuration to control whether to skip the cancellation here for incremental collect queries, skipping by default for safety. ## Types of changes :bookmark: - [x] Bugfix (non-breaking change which fixes an issue) - [ ] New feature (non-breaking change which adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to change) ## Test Plan ๐งช #### Behavior Without This Pull Request :coffin: #### Behavior With This Pull Request :tada: #### Related Unit Tests --- # Checklist ๐ - [x] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html) **Be nice. Be informative.** Closes #6482 from XorSum/features/skip-cancel-incremental. Closes #6302 440311f07 [xorsum] reformat edbc37868 [bkhan] Update externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala d6c99366c [xorsum] one line 9f40405c7 [xorsum] update configuration b1526319e [xorsum] skip job group cancellation on incremental collect mode Lead-authored-by: xorsum <[email protected]> Co-authored-by: bkhan <[email protected]> Signed-off-by: Cheng Pan <[email protected]> --- .../kyuubi/engine/spark/operation/ExecuteStatement.scala | 9 +++++++-- .../src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala | 11 +++++++++++ 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala index d2f713124..ca60de38f 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.kyuubi.SparkDatasetHelper._ import org.apache.spark.sql.types._ import org.apache.kyuubi.{KyuubiSQLException, Logging} -import org.apache.kyuubi.config.KyuubiConf.{OPERATION_RESULT_MAX_ROWS, OPERATION_RESULT_SAVE_TO_FILE, OPERATION_RESULT_SAVE_TO_FILE_MIN_ROWS, OPERATION_RESULT_SAVE_TO_FILE_MINSIZE} +import org.apache.kyuubi.config.KyuubiConf.{ENGINE_SPARK_OPERATION_INCREMENTAL_COLLECT_CANCEL_JOB_GROUP, OPERATION_RESULT_MAX_ROWS, OPERATION_RESULT_SAVE_TO_FILE, OPERATION_RESULT_SAVE_TO_FILE_MIN_ROWS, OPERATION_RESULT_SAVE_TO_FILE_MINSIZE} import org.apache.kyuubi.engine.spark.KyuubiSparkUtil._ import org.apache.kyuubi.engine.spark.session.{SparkSessionImpl, SparkSQLSessionManager} import org.apache.kyuubi.operation.{ArrayFetchIterator, FetchIterator, IterableFetchIterator, OperationHandle, OperationState} @@ -104,7 +104,12 @@ class ExecuteStatement( onError(cancel = true) } finally { shutdownTimeoutMonitor() - if (!spark.sparkContext.isStopped) spark.sparkContext.cancelJobGroup(statementId) + if (!spark.sparkContext.isStopped) { + if (!incrementalCollect || + getSessionConf(ENGINE_SPARK_OPERATION_INCREMENTAL_COLLECT_CANCEL_JOB_GROUP, spark)) { + spark.sparkContext.cancelJobGroup(statementId) + } + } } override protected def runInternal(): Unit = { diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala index 344806dc9..422eb4718 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala @@ -2761,6 +2761,17 @@ object KyuubiConf { .version("1.10.0") .fallbackConf(OPERATION_INCREMENTAL_COLLECT) + val ENGINE_SPARK_OPERATION_INCREMENTAL_COLLECT_CANCEL_JOB_GROUP: ConfigEntry[Boolean] = + buildConf( + "kyuubi.engine.spark.operation.incremental.collect.cancelJobGroupAfterExecutionFinished") + .internal + .doc("Canceling jobs group that are still running after statement execution finished " + + "avoids wasting resources. But the cancellation may cause the query fail when using " + + "incremental collect mode.") + .version("1.9.2") + .booleanConf + .createWithDefault(false) + val ENGINE_SESSION_SPARK_INITIALIZE_SQL: ConfigEntry[Seq[String]] = buildConf("kyuubi.session.engine.spark.initialize.sql") .doc("The initialize sql for Spark session. " +
