This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 848060546 [KYUUBI #5730] Tolerate execeptions for periodical scheduled 
tasks
848060546 is described below

commit 84806054638a0ad145e81c67c804661f3673a432
Author: Bowen Liang <[email protected]>
AuthorDate: Mon Nov 20 20:36:25 2023 +0800

    [KYUUBI #5730] Tolerate execeptions for periodical scheduled tasks
    
    # :mag: Description
    ## Issue References ๐Ÿ”—
    
    As the PR https://github.com/apache/kyuubi/pull/5727 fixed a problem caused 
by leaking exception in a scheduled task, the exception thrown should be 
properly handled to prevent suspension in 
`ScheduledExecutorService.scheduleWithFixedDelay`.
    
    ## Describe Your Solution ๐Ÿ”ง
    Introducing a util method `scheduleTolerableRunnableWithFixedDelay` for 
catching possible exceptions in scheduled tasks.
    
    ## Types of changes :bookmark:
    
    - [ ] Bugfix (non-breaking change which fixes an issue)
    - [ ] New feature (non-breaking change which adds functionality)
    - [ ] Breaking change (fix or feature that would cause existing 
functionality to change)
    
    ## Test Plan ๐Ÿงช
    
    #### Behavior Without This Pull Request :coffin:
    No behaviour change.
    
    #### Behavior With This Pull Request :tada:
    This is a fallback measure to ensure all the exceptions handled.
    Currently all the occurrences are properly handled.
    
    #### Related Unit Tests
    Pass all the CI tests.
    
    ---
    
    # Checklists
    ## ๐Ÿ“ Author Self Checklist
    
    - [x] My code follows the [style 
guidelines](https://kyuubi.readthedocs.io/en/master/contributing/code/style.html)
 of this project
    - [x] I have performed a self-review
    - [ ] I have commented my code, particularly in hard-to-understand areas
    - [ ] I have made corresponding changes to the documentation
    - [x] My changes generate no new warnings
    - [ ] I have added tests that prove my fix is effective or that my feature 
works
    - [ ] New and existing unit tests pass locally with my changes
    - [x] This patch was not authored or co-authored using [Generative 
Tooling](https://www.apache.org/legal/generative-tooling.html)
    
    ## ๐Ÿ“ Committer Pre-Merge Checklist
    
    - [ ] Pull request title is okay.
    - [ ] No license issues.
    - [ ] Milestone correctly set?
    - [ ] Test coverage is ok
    - [ ] Assignees are selected.
    - [ ] Minimum number of approvals
    - [ ] No changes are requested
    
    **Be nice. Be informative.**
    
    Closes #5730 from bowenliang123/tolerate-runnable.
    
    Closes #5730
    
    a38eda075 [Bowen Liang] use thread name for error message
    6bc13b775 [Bowen Liang] comment
    87e2bf253 [Bowen Liang] schedule tolerable runnable
    
    Authored-by: Bowen Liang <[email protected]>
    Signed-off-by: fwang12 <[email protected]>
---
 .../kyuubi/engine/spark/SparkSQLEngine.scala       |  4 +++-
 .../spark/session/SparkSQLSessionManager.scala     |  6 ++++--
 .../org/apache/kyuubi/session/SessionManager.scala | 15 +++++++++++--
 .../scala/org/apache/kyuubi/util/ThreadUtils.scala | 25 +++++++++++++++++++++-
 .../kyuubi/client/KyuubiSyncThriftClient.scala     |  4 +++-
 .../credentials/HadoopCredentialsManager.scala     |  4 +++-
 .../kyuubi/server/KyuubiRestFrontendService.scala  |  8 ++++++-
 .../apache/kyuubi/server/PeriodicGCService.scala   |  8 ++++++-
 .../kyuubi/server/metadata/MetadataManager.scala   |  8 +++++--
 .../kyuubi/session/KyuubiSessionManager.scala      |  4 +++-
 10 files changed, 73 insertions(+), 13 deletions(-)

diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
index 6e323cfe7..dbf5075a1 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
@@ -46,6 +46,7 @@ import org.apache.kyuubi.ha.client.RetryPolicies
 import org.apache.kyuubi.service.Serverable
 import org.apache.kyuubi.session.SessionHandle
 import org.apache.kyuubi.util.{SignalRegister, ThreadUtils}
+import 
org.apache.kyuubi.util.ThreadUtils.scheduleTolerableRunnableWithFixedDelay
 
 case class SparkSQLEngine(spark: SparkSession) extends 
Serverable("SparkSQLEngine") {
 
@@ -167,7 +168,8 @@ case class SparkSQLEngine(spark: SparkSession) extends 
Serverable("SparkSQLEngin
       }
       lifetimeTerminatingChecker =
         
Some(ThreadUtils.newDaemonSingleThreadScheduledExecutor("spark-engine-lifetime-checker"))
-      lifetimeTerminatingChecker.get.scheduleWithFixedDelay(
+      scheduleTolerableRunnableWithFixedDelay(
+        lifetimeTerminatingChecker.get,
         checkTask,
         interval,
         interval,
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
index 79f38ce35..7ebcc8d37 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
@@ -32,6 +32,7 @@ import org.apache.kyuubi.engine.spark.{KyuubiSparkUtil, 
SparkSQLEngine}
 import org.apache.kyuubi.engine.spark.operation.SparkSQLOperationManager
 import org.apache.kyuubi.session._
 import org.apache.kyuubi.util.ThreadUtils
+import 
org.apache.kyuubi.util.ThreadUtils.scheduleTolerableRunnableWithFixedDelay
 
 /**
  * A [[SessionManager]] constructed with [[SparkSession]] which give it the 
ability to talk with
@@ -66,8 +67,9 @@ class SparkSQLSessionManager private (name: String, spark: 
SparkSession)
     if (!userIsolatedSparkSession) {
       userIsolatedSparkSessionThread =
         
Some(ThreadUtils.newDaemonSingleThreadScheduledExecutor("user-isolated-cache-checker"))
-      userIsolatedSparkSessionThread.foreach {
-        _.scheduleWithFixedDelay(
+      userIsolatedSparkSessionThread.foreach { thread =>
+        scheduleTolerableRunnableWithFixedDelay(
+          thread,
           () => {
             userIsolatedCacheLock.synchronized {
               val iter = userIsolatedCacheCount.entrySet().iterator()
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala
index 4a3d25689..e3ead7486 100644
--- 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala
+++ 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala
@@ -33,6 +33,7 @@ import org.apache.kyuubi.config.KyuubiConf._
 import org.apache.kyuubi.operation.OperationManager
 import org.apache.kyuubi.service.CompositeService
 import org.apache.kyuubi.util.ThreadUtils
+import 
org.apache.kyuubi.util.ThreadUtils.scheduleTolerableRunnableWithFixedDelay
 
 /**
  * The [[SessionManager]] holds the all the connected [[Session]]s, provides 
us the APIs to
@@ -324,7 +325,12 @@ abstract class SessionManager(name: String) extends 
CompositeService(name) {
       }
     }
 
-    timeoutChecker.scheduleWithFixedDelay(checkTask, interval, interval, 
TimeUnit.MILLISECONDS)
+    scheduleTolerableRunnableWithFixedDelay(
+      timeoutChecker,
+      checkTask,
+      interval,
+      interval,
+      TimeUnit.MILLISECONDS)
   }
 
   private[kyuubi] def startTerminatingChecker(stop: () => Unit): Unit = if 
(!isServer) {
@@ -342,7 +348,12 @@ abstract class SessionManager(name: String) extends 
CompositeService(name) {
           }
         }
       }
-      timeoutChecker.scheduleWithFixedDelay(checkTask, interval, interval, 
TimeUnit.MILLISECONDS)
+      scheduleTolerableRunnableWithFixedDelay(
+        timeoutChecker,
+        checkTask,
+        interval,
+        interval,
+        TimeUnit.MILLISECONDS)
     }
   }
 }
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/ThreadUtils.scala 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/ThreadUtils.scala
index 76d3f416f..aeab37b6f 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/ThreadUtils.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/ThreadUtils.scala
@@ -17,7 +17,7 @@
 
 package org.apache.kyuubi.util
 
-import java.util.concurrent.{Executors, ExecutorService, LinkedBlockingQueue, 
ScheduledExecutorService, ScheduledThreadPoolExecutor, ThreadPoolExecutor, 
TimeUnit}
+import java.util.concurrent._
 
 import scala.concurrent.Awaitable
 import scala.concurrent.duration.{Duration, FiniteDuration}
@@ -109,4 +109,27 @@ object ThreadUtils extends Logging {
     
thread.setUncaughtExceptionHandler(NamedThreadFactory.kyuubiUncaughtExceptionHandler)
     thread.start()
   }
+
+  /**
+   * Schedule a runnable to the scheduled executor service.
+   * The exceptions thrown in the runnable will be caught and logged.
+   */
+  def scheduleTolerableRunnableWithFixedDelay(
+      scheduler: ScheduledExecutorService,
+      runnable: Runnable,
+      initialDelay: Long,
+      delay: Long,
+      timeUnit: TimeUnit): Unit = {
+    scheduler.scheduleWithFixedDelay(
+      () =>
+        try {
+          runnable.run()
+        } catch {
+          case t: Throwable =>
+            error(s"Uncaught exception in thread 
${Thread.currentThread().getName}", t)
+        },
+      initialDelay,
+      delay,
+      timeUnit)
+  }
 }
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
index ad7191c09..9f61021b0 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
@@ -39,6 +39,7 @@ import 
org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
 import org.apache.kyuubi.service.authentication.PlainSASLHelper
 import org.apache.kyuubi.session.SessionHandle
 import org.apache.kyuubi.util.{ThreadUtils, ThriftUtils}
+import 
org.apache.kyuubi.util.ThreadUtils.scheduleTolerableRunnableWithFixedDelay
 
 class KyuubiSyncThriftClient private (
     protocol: TProtocol,
@@ -125,7 +126,8 @@ class KyuubiSyncThriftClient private (
       }
     }
     engineLastAlive = System.currentTimeMillis()
-    engineAliveThreadPool.scheduleWithFixedDelay(
+    scheduleTolerableRunnableWithFixedDelay(
+      engineAliveThreadPool,
       task,
       engineAliveProbeInterval,
       engineAliveProbeInterval,
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopCredentialsManager.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopCredentialsManager.scala
index 60e3daff3..92b201718 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopCredentialsManager.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopCredentialsManager.scala
@@ -33,6 +33,7 @@ import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.config.KyuubiConf._
 import org.apache.kyuubi.service.AbstractService
 import org.apache.kyuubi.util.{KyuubiHadoopUtils, ThreadUtils}
+import 
org.apache.kyuubi.util.ThreadUtils.scheduleTolerableRunnableWithFixedDelay
 import org.apache.kyuubi.util.reflect.ReflectUtils._
 
 /**
@@ -299,7 +300,8 @@ class HadoopCredentialsManager private (name: String) 
extends AbstractService(na
     }
 
     credentialsTimeoutChecker.foreach { executor =>
-      executor.scheduleWithFixedDelay(
+      scheduleTolerableRunnableWithFixedDelay(
+        executor,
         checkTask,
         credentialsCheckInterval,
         credentialsCheckInterval,
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala
index 376c97934..d73899513 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala
@@ -38,6 +38,7 @@ import org.apache.kyuubi.service.{AbstractFrontendService, 
Serverable, Service,
 import org.apache.kyuubi.service.authentication.{AuthMethods, AuthTypes, 
KyuubiAuthenticationFactory}
 import org.apache.kyuubi.session.{KyuubiSessionManager, SessionHandle}
 import org.apache.kyuubi.util.ThreadUtils
+import 
org.apache.kyuubi.util.ThreadUtils.scheduleTolerableRunnableWithFixedDelay
 
 /**
  * A frontend service based on RESTful api via HTTP protocol.
@@ -142,7 +143,12 @@ class KyuubiRestFrontendService(override val serverable: 
Serverable)
       }
     }
 
-    batchChecker.scheduleWithFixedDelay(task, interval, interval, 
TimeUnit.MILLISECONDS)
+    scheduleTolerableRunnableWithFixedDelay(
+      batchChecker,
+      task,
+      interval,
+      interval,
+      TimeUnit.MILLISECONDS)
   }
 
   @VisibleForTesting
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/PeriodicGCService.scala 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/PeriodicGCService.scala
index a4035b689..4ec6f4c12 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/PeriodicGCService.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/PeriodicGCService.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.service.AbstractService
 import org.apache.kyuubi.util.ThreadUtils
+import 
org.apache.kyuubi.util.ThreadUtils.scheduleTolerableRunnableWithFixedDelay
 
 class PeriodicGCService(name: String) extends AbstractService(name) {
   def this() = this(classOf[PeriodicGCService].getSimpleName)
@@ -40,6 +41,11 @@ class PeriodicGCService(name: String) extends 
AbstractService(name) {
 
   private def startGcTrigger(): Unit = {
     val interval = conf.get(KyuubiConf.SERVER_PERIODIC_GC_INTERVAL)
-    gcTrigger.scheduleWithFixedDelay(() => System.gc(), interval, interval, 
TimeUnit.MILLISECONDS)
+    scheduleTolerableRunnableWithFixedDelay(
+      gcTrigger,
+      () => System.gc(),
+      interval,
+      interval,
+      TimeUnit.MILLISECONDS)
   }
 }
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataManager.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataManager.scala
index 1da9e1f31..6dd0e76e0 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataManager.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataManager.scala
@@ -32,6 +32,7 @@ import org.apache.kyuubi.server.metadata.api.{Metadata, 
MetadataFilter}
 import org.apache.kyuubi.service.AbstractService
 import org.apache.kyuubi.session.SessionType
 import org.apache.kyuubi.util.{ClassUtils, JdbcUtils, ThreadUtils}
+import 
org.apache.kyuubi.util.ThreadUtils.scheduleTolerableRunnableWithFixedDelay
 
 class MetadataManager extends AbstractService("MetadataManager") {
   import MetadataManager._
@@ -209,7 +210,8 @@ class MetadataManager extends 
AbstractService("MetadataManager") {
       }
     }
 
-    metadataCleaner.scheduleWithFixedDelay(
+    scheduleTolerableRunnableWithFixedDelay(
+      metadataCleaner,
       cleanerTask,
       interval,
       interval,
@@ -298,7 +300,9 @@ class MetadataManager extends 
AbstractService("MetadataManager") {
         }
       }
     }
-    requestsAsyncRetryTrigger.scheduleWithFixedDelay(
+
+    scheduleTolerableRunnableWithFixedDelay(
+      requestsAsyncRetryTrigger,
       triggerTask,
       requestsRetryInterval,
       requestsRetryInterval,
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
index 4c625ce67..72e2eb967 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
@@ -41,6 +41,7 @@ import org.apache.kyuubi.server.metadata.{MetadataManager, 
MetadataRequestsRetry
 import org.apache.kyuubi.server.metadata.api.{Metadata, MetadataFilter}
 import org.apache.kyuubi.sql.parser.server.KyuubiParser
 import org.apache.kyuubi.util.{SignUtils, ThreadUtils}
+import 
org.apache.kyuubi.util.ThreadUtils.scheduleTolerableRunnableWithFixedDelay
 
 class KyuubiSessionManager private (name: String) extends SessionManager(name) 
{
 
@@ -409,7 +410,8 @@ class KyuubiSessionManager private (name: String) extends 
SessionManager(name) {
         }
       }
     }
-    engineConnectionAliveChecker.scheduleWithFixedDelay(
+    scheduleTolerableRunnableWithFixedDelay(
+      engineConnectionAliveChecker,
       checkTask,
       interval,
       interval,

Reply via email to