This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch branch-1.8
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/branch-1.8 by this push:
new deb9221b2 [KYUUBI #5730] Tolerate execeptions for periodical scheduled
tasks
deb9221b2 is described below
commit deb9221b2ea9bb5aa63ba3137f665c665e82dea1
Author: Bowen Liang <[email protected]>
AuthorDate: Mon Nov 20 20:36:25 2023 +0800
[KYUUBI #5730] Tolerate execeptions for periodical scheduled tasks
# :mag: Description
## Issue References ๐
As the PR https://github.com/apache/kyuubi/pull/5727 fixed a problem caused
by leaking exception in a scheduled task, the exception thrown should be
properly handled to prevent suspension in
`ScheduledExecutorService.scheduleWithFixedDelay`.
## Describe Your Solution ๐ง
Introducing a util method `scheduleTolerableRunnableWithFixedDelay` for
catching possible exceptions in scheduled tasks.
## Types of changes :bookmark:
- [ ] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing
functionality to change)
## Test Plan ๐งช
#### Behavior Without This Pull Request :coffin:
No behaviour change.
#### Behavior With This Pull Request :tada:
This is a fallback measure to ensure all the exceptions handled.
Currently all the occurrences are properly handled.
#### Related Unit Tests
Pass all the CI tests.
---
# Checklists
## ๐ Author Self Checklist
- [x] My code follows the [style
guidelines](https://kyuubi.readthedocs.io/en/master/contributing/code/style.html)
of this project
- [x] I have performed a self-review
- [ ] I have commented my code, particularly in hard-to-understand areas
- [ ] I have made corresponding changes to the documentation
- [x] My changes generate no new warnings
- [ ] I have added tests that prove my fix is effective or that my feature
works
- [ ] New and existing unit tests pass locally with my changes
- [x] This patch was not authored or co-authored using [Generative
Tooling](https://www.apache.org/legal/generative-tooling.html)
## ๐ Committer Pre-Merge Checklist
- [ ] Pull request title is okay.
- [ ] No license issues.
- [ ] Milestone correctly set?
- [ ] Test coverage is ok
- [ ] Assignees are selected.
- [ ] Minimum number of approvals
- [ ] No changes are requested
**Be nice. Be informative.**
Closes #5730 from bowenliang123/tolerate-runnable.
Closes #5730
a38eda075 [Bowen Liang] use thread name for error message
6bc13b775 [Bowen Liang] comment
87e2bf253 [Bowen Liang] schedule tolerable runnable
Authored-by: Bowen Liang <[email protected]>
Signed-off-by: fwang12 <[email protected]>
(cherry picked from commit 84806054638a0ad145e81c67c804661f3673a432)
Signed-off-by: fwang12 <[email protected]>
---
.../kyuubi/engine/spark/SparkSQLEngine.scala | 4 +++-
.../spark/session/SparkSQLSessionManager.scala | 6 ++++--
.../org/apache/kyuubi/session/SessionManager.scala | 15 +++++++++++--
.../scala/org/apache/kyuubi/util/ThreadUtils.scala | 25 +++++++++++++++++++++-
.../kyuubi/client/KyuubiSyncThriftClient.scala | 4 +++-
.../credentials/HadoopCredentialsManager.scala | 4 +++-
.../kyuubi/server/KyuubiRestFrontendService.scala | 8 ++++++-
.../apache/kyuubi/server/PeriodicGCService.scala | 8 ++++++-
.../kyuubi/server/metadata/MetadataManager.scala | 8 +++++--
.../kyuubi/session/KyuubiSessionManager.scala | 4 +++-
10 files changed, 73 insertions(+), 13 deletions(-)
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
index 6e323cfe7..dbf5075a1 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
@@ -46,6 +46,7 @@ import org.apache.kyuubi.ha.client.RetryPolicies
import org.apache.kyuubi.service.Serverable
import org.apache.kyuubi.session.SessionHandle
import org.apache.kyuubi.util.{SignalRegister, ThreadUtils}
+import
org.apache.kyuubi.util.ThreadUtils.scheduleTolerableRunnableWithFixedDelay
case class SparkSQLEngine(spark: SparkSession) extends
Serverable("SparkSQLEngine") {
@@ -167,7 +168,8 @@ case class SparkSQLEngine(spark: SparkSession) extends
Serverable("SparkSQLEngin
}
lifetimeTerminatingChecker =
Some(ThreadUtils.newDaemonSingleThreadScheduledExecutor("spark-engine-lifetime-checker"))
- lifetimeTerminatingChecker.get.scheduleWithFixedDelay(
+ scheduleTolerableRunnableWithFixedDelay(
+ lifetimeTerminatingChecker.get,
checkTask,
interval,
interval,
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
index 79f38ce35..7ebcc8d37 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
@@ -32,6 +32,7 @@ import org.apache.kyuubi.engine.spark.{KyuubiSparkUtil,
SparkSQLEngine}
import org.apache.kyuubi.engine.spark.operation.SparkSQLOperationManager
import org.apache.kyuubi.session._
import org.apache.kyuubi.util.ThreadUtils
+import
org.apache.kyuubi.util.ThreadUtils.scheduleTolerableRunnableWithFixedDelay
/**
* A [[SessionManager]] constructed with [[SparkSession]] which give it the
ability to talk with
@@ -66,8 +67,9 @@ class SparkSQLSessionManager private (name: String, spark:
SparkSession)
if (!userIsolatedSparkSession) {
userIsolatedSparkSessionThread =
Some(ThreadUtils.newDaemonSingleThreadScheduledExecutor("user-isolated-cache-checker"))
- userIsolatedSparkSessionThread.foreach {
- _.scheduleWithFixedDelay(
+ userIsolatedSparkSessionThread.foreach { thread =>
+ scheduleTolerableRunnableWithFixedDelay(
+ thread,
() => {
userIsolatedCacheLock.synchronized {
val iter = userIsolatedCacheCount.entrySet().iterator()
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala
index 4a3d25689..e3ead7486 100644
---
a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala
+++
b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala
@@ -33,6 +33,7 @@ import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.operation.OperationManager
import org.apache.kyuubi.service.CompositeService
import org.apache.kyuubi.util.ThreadUtils
+import
org.apache.kyuubi.util.ThreadUtils.scheduleTolerableRunnableWithFixedDelay
/**
* The [[SessionManager]] holds the all the connected [[Session]]s, provides
us the APIs to
@@ -324,7 +325,12 @@ abstract class SessionManager(name: String) extends
CompositeService(name) {
}
}
- timeoutChecker.scheduleWithFixedDelay(checkTask, interval, interval,
TimeUnit.MILLISECONDS)
+ scheduleTolerableRunnableWithFixedDelay(
+ timeoutChecker,
+ checkTask,
+ interval,
+ interval,
+ TimeUnit.MILLISECONDS)
}
private[kyuubi] def startTerminatingChecker(stop: () => Unit): Unit = if
(!isServer) {
@@ -342,7 +348,12 @@ abstract class SessionManager(name: String) extends
CompositeService(name) {
}
}
}
- timeoutChecker.scheduleWithFixedDelay(checkTask, interval, interval,
TimeUnit.MILLISECONDS)
+ scheduleTolerableRunnableWithFixedDelay(
+ timeoutChecker,
+ checkTask,
+ interval,
+ interval,
+ TimeUnit.MILLISECONDS)
}
}
}
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/ThreadUtils.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/ThreadUtils.scala
index 76d3f416f..aeab37b6f 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/ThreadUtils.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/ThreadUtils.scala
@@ -17,7 +17,7 @@
package org.apache.kyuubi.util
-import java.util.concurrent.{Executors, ExecutorService, LinkedBlockingQueue,
ScheduledExecutorService, ScheduledThreadPoolExecutor, ThreadPoolExecutor,
TimeUnit}
+import java.util.concurrent._
import scala.concurrent.Awaitable
import scala.concurrent.duration.{Duration, FiniteDuration}
@@ -109,4 +109,27 @@ object ThreadUtils extends Logging {
thread.setUncaughtExceptionHandler(NamedThreadFactory.kyuubiUncaughtExceptionHandler)
thread.start()
}
+
+ /**
+ * Schedule a runnable to the scheduled executor service.
+ * The exceptions thrown in the runnable will be caught and logged.
+ */
+ def scheduleTolerableRunnableWithFixedDelay(
+ scheduler: ScheduledExecutorService,
+ runnable: Runnable,
+ initialDelay: Long,
+ delay: Long,
+ timeUnit: TimeUnit): Unit = {
+ scheduler.scheduleWithFixedDelay(
+ () =>
+ try {
+ runnable.run()
+ } catch {
+ case t: Throwable =>
+ error(s"Uncaught exception in thread
${Thread.currentThread().getName}", t)
+ },
+ initialDelay,
+ delay,
+ timeUnit)
+ }
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
index ad7191c09..9f61021b0 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
@@ -39,6 +39,7 @@ import
org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
import org.apache.kyuubi.service.authentication.PlainSASLHelper
import org.apache.kyuubi.session.SessionHandle
import org.apache.kyuubi.util.{ThreadUtils, ThriftUtils}
+import
org.apache.kyuubi.util.ThreadUtils.scheduleTolerableRunnableWithFixedDelay
class KyuubiSyncThriftClient private (
protocol: TProtocol,
@@ -125,7 +126,8 @@ class KyuubiSyncThriftClient private (
}
}
engineLastAlive = System.currentTimeMillis()
- engineAliveThreadPool.scheduleWithFixedDelay(
+ scheduleTolerableRunnableWithFixedDelay(
+ engineAliveThreadPool,
task,
engineAliveProbeInterval,
engineAliveProbeInterval,
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopCredentialsManager.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopCredentialsManager.scala
index 60e3daff3..92b201718 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopCredentialsManager.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopCredentialsManager.scala
@@ -33,6 +33,7 @@ import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.service.AbstractService
import org.apache.kyuubi.util.{KyuubiHadoopUtils, ThreadUtils}
+import
org.apache.kyuubi.util.ThreadUtils.scheduleTolerableRunnableWithFixedDelay
import org.apache.kyuubi.util.reflect.ReflectUtils._
/**
@@ -299,7 +300,8 @@ class HadoopCredentialsManager private (name: String)
extends AbstractService(na
}
credentialsTimeoutChecker.foreach { executor =>
- executor.scheduleWithFixedDelay(
+ scheduleTolerableRunnableWithFixedDelay(
+ executor,
checkTask,
credentialsCheckInterval,
credentialsCheckInterval,
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala
index 5335ff61a..65b108468 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala
@@ -38,6 +38,7 @@ import org.apache.kyuubi.service.{AbstractFrontendService,
Serverable, Service,
import org.apache.kyuubi.service.authentication.{AuthMethods, AuthTypes,
KyuubiAuthenticationFactory}
import org.apache.kyuubi.session.{KyuubiSessionManager, SessionHandle}
import org.apache.kyuubi.util.ThreadUtils
+import
org.apache.kyuubi.util.ThreadUtils.scheduleTolerableRunnableWithFixedDelay
/**
* A frontend service based on RESTful api via HTTP protocol.
@@ -142,7 +143,12 @@ class KyuubiRestFrontendService(override val serverable:
Serverable)
}
}
- batchChecker.scheduleWithFixedDelay(task, interval, interval,
TimeUnit.MILLISECONDS)
+ scheduleTolerableRunnableWithFixedDelay(
+ batchChecker,
+ task,
+ interval,
+ interval,
+ TimeUnit.MILLISECONDS)
}
@VisibleForTesting
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/PeriodicGCService.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/PeriodicGCService.scala
index a4035b689..4ec6f4c12 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/PeriodicGCService.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/PeriodicGCService.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.service.AbstractService
import org.apache.kyuubi.util.ThreadUtils
+import
org.apache.kyuubi.util.ThreadUtils.scheduleTolerableRunnableWithFixedDelay
class PeriodicGCService(name: String) extends AbstractService(name) {
def this() = this(classOf[PeriodicGCService].getSimpleName)
@@ -40,6 +41,11 @@ class PeriodicGCService(name: String) extends
AbstractService(name) {
private def startGcTrigger(): Unit = {
val interval = conf.get(KyuubiConf.SERVER_PERIODIC_GC_INTERVAL)
- gcTrigger.scheduleWithFixedDelay(() => System.gc(), interval, interval,
TimeUnit.MILLISECONDS)
+ scheduleTolerableRunnableWithFixedDelay(
+ gcTrigger,
+ () => System.gc(),
+ interval,
+ interval,
+ TimeUnit.MILLISECONDS)
}
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataManager.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataManager.scala
index 1da9e1f31..6dd0e76e0 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataManager.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataManager.scala
@@ -32,6 +32,7 @@ import org.apache.kyuubi.server.metadata.api.{Metadata,
MetadataFilter}
import org.apache.kyuubi.service.AbstractService
import org.apache.kyuubi.session.SessionType
import org.apache.kyuubi.util.{ClassUtils, JdbcUtils, ThreadUtils}
+import
org.apache.kyuubi.util.ThreadUtils.scheduleTolerableRunnableWithFixedDelay
class MetadataManager extends AbstractService("MetadataManager") {
import MetadataManager._
@@ -209,7 +210,8 @@ class MetadataManager extends
AbstractService("MetadataManager") {
}
}
- metadataCleaner.scheduleWithFixedDelay(
+ scheduleTolerableRunnableWithFixedDelay(
+ metadataCleaner,
cleanerTask,
interval,
interval,
@@ -298,7 +300,9 @@ class MetadataManager extends
AbstractService("MetadataManager") {
}
}
}
- requestsAsyncRetryTrigger.scheduleWithFixedDelay(
+
+ scheduleTolerableRunnableWithFixedDelay(
+ requestsAsyncRetryTrigger,
triggerTask,
requestsRetryInterval,
requestsRetryInterval,
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
index 02a3ee32c..c57cfbf7f 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
@@ -41,6 +41,7 @@ import org.apache.kyuubi.server.metadata.{MetadataManager,
MetadataRequestsRetry
import org.apache.kyuubi.server.metadata.api.{Metadata, MetadataFilter}
import org.apache.kyuubi.sql.parser.server.KyuubiParser
import org.apache.kyuubi.util.{SignUtils, ThreadUtils}
+import
org.apache.kyuubi.util.ThreadUtils.scheduleTolerableRunnableWithFixedDelay
class KyuubiSessionManager private (name: String) extends SessionManager(name)
{
@@ -409,7 +410,8 @@ class KyuubiSessionManager private (name: String) extends
SessionManager(name) {
}
}
}
- engineConnectionAliveChecker.scheduleWithFixedDelay(
+ scheduleTolerableRunnableWithFixedDelay(
+ engineConnectionAliveChecker,
checkTask,
interval,
interval,