This is an automated email from the ASF dual-hosted git repository.

ggal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-livy.git


The following commit(s) were added to refs/heads/master by this push:
     new 6097af1c [LIVY-1003][RSC] Interactive session - Setting large value of 
rsc.server.connect.timeout blocks other tasks
6097af1c is described below

commit 6097af1cdd536ebbe1d7eacb1513a440a5fa2784
Author: wangdengshan <[email protected]>
AuthorDate: Fri Sep 13 16:10:30 2024 +0800

    [LIVY-1003][RSC] Interactive session - Setting large value of 
rsc.server.connect.timeout blocks other tasks
    
    ## What changes were proposed in this pull request?
    
    The main adjustment here is the thread pool used when creating and closing 
sessions asynchronously. Scala's default thread pool size is limited, which 
will cause the waiting thread to be blocked.
    
    https://issues.apache.org/jira/browse/LIVY-1003
    
    ## How was this patch tested?
    
    How to reproduce:
    1. Set `livy.rsc.server.connect.timeout` to something high like 24h.
    2. Create enough interactive livy sessions in YARN so that they are queued 
in ACCEPTED state. The number of sessions that are stuck in ACCEPTED state 
should be equal to global execution context [thread pool 
size](https://docs.scala-lang.org/overviews/core/futures.html#the-global-execution-context)
 (Runtime.availableProcessors)
    3. Try to delete a session using DELETE /sessions/{sessionId}
    and it should not be hang until one of the sessions is no longer stuck in 
ACCEPTED state.
---
 .../src/main/scala/org/apache/livy/LivyConf.scala  |  4 +++
 .../server/interactive/InteractiveSession.scala    | 13 ++++++---
 .../scala/org/apache/livy/sessions/Session.scala   | 34 ++++++++++++++++++++--
 3 files changed, 45 insertions(+), 6 deletions(-)

diff --git a/server/src/main/scala/org/apache/livy/LivyConf.scala 
b/server/src/main/scala/org/apache/livy/LivyConf.scala
index 25e6ea80..e798012e 100644
--- a/server/src/main/scala/org/apache/livy/LivyConf.scala
+++ b/server/src/main/scala/org/apache/livy/LivyConf.scala
@@ -138,6 +138,10 @@ object LivyConf {
     Entry("livy.server.thrift.async.exec.wait.queue.size", 100)
   val THRIFT_ASYNC_EXEC_KEEPALIVE_TIME =
     Entry("livy.server.thrift.async.exec.keepalive.time", "10s")
+  val SESSION_MANAGE_THREADS = Entry("livy.server.session.manage.threads", 200)
+  val SESSION_MANAGE_SHUTDOWN_TIMEOUT = 
Entry("livy.server.session.manage.shutdown.timeout", "10s")
+  val SESSION_MANAGE_WAIT_QUEUE_SIZE = 
Entry("livy.server.session.manage.wait.queue.size", 100)
+  val SESSION_MANAGE_KEEPALIVE_TIME = 
Entry("livy.server.session.manage.keepalive.time", "10s")
   val THRIFT_BIND_HOST = Entry("livy.server.thrift.bind.host", null)
   val THRIFT_WORKER_KEEPALIVE_TIME = 
Entry("livy.server.thrift.worker.keepalive.time", "60s")
   val THRIFT_MIN_WORKER_THREADS = 
Entry("livy.server.thrift.min.worker.threads", 5)
diff --git 
a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
 
b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
index d8c4a16b..4250794d 100644
--- 
a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
+++ 
b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
@@ -476,15 +476,20 @@ class InteractiveSession(
       info(msg)
       sessionLog = IndexedSeq(msg)
     } else {
-      val uriFuture = Future { client.get.getServerUri.get() }
+      val uriFuture = Future {
+        client.get.getServerUri.get()
+      }(sessionManageExecutors)
 
       uriFuture.onSuccess { case url =>
         rscDriverUri = Option(url)
         sessionSaveLock.synchronized {
           sessionStore.save(RECOVERY_SESSION_TYPE, recoveryMetadata)
         }
-      }
-      uriFuture.onFailure { case e => warn("Fail to get rsc uri", e) }
+      }(sessionManageExecutors)
+
+      uriFuture.onFailure {
+        case e => warn("Fail to get rsc uri", e)
+      }(sessionManageExecutors)
 
       // Send a dummy job that will return once the client is ready to be 
used, and set the
       // state to "idle" at that point.
@@ -576,7 +581,7 @@ class InteractiveSession(
     }
   }
 
-  def interrupt(): Future[Unit] = {
+  def interrupt(): Future[AnyVal] = {
     stop()
   }
 
diff --git a/server/src/main/scala/org/apache/livy/sessions/Session.scala 
b/server/src/main/scala/org/apache/livy/sessions/Session.scala
index b82c2504..423f8ebb 100644
--- a/server/src/main/scala/org/apache/livy/sessions/Session.scala
+++ b/server/src/main/scala/org/apache/livy/sessions/Session.scala
@@ -20,6 +20,7 @@ package org.apache.livy.sessions
 import java.io.InputStream
 import java.net.{URI, URISyntaxException}
 import java.security.PrivilegedExceptionAction
+import java.util.concurrent.{Executors, LinkedBlockingQueue, ThreadFactory, 
ThreadPoolExecutor, TimeUnit}
 import java.util.UUID
 
 import scala.concurrent.{ExecutionContext, Future}
@@ -141,6 +142,16 @@ object Session {
   }
 }
 
+class NamedThreadFactory(prefix: String) extends ThreadFactory {
+  private val defaultFactory = Executors.defaultThreadFactory()
+
+  override def newThread(r: Runnable): Thread = {
+    val thread = defaultFactory.newThread(r)
+    thread.setName(prefix + "-" + thread.getName)
+    thread
+  }
+}
+
 abstract class Session(
     val id: Int,
     val name: Option[String],
@@ -159,6 +170,25 @@ abstract class Session(
 
   import Session._
 
+  protected val sessionManageExecutors: ExecutionContext = {
+    val poolSize = livyConf.getInt(LivyConf.SESSION_MANAGE_THREADS)
+    val poolQueueSize = 
livyConf.getInt(LivyConf.SESSION_MANAGE_WAIT_QUEUE_SIZE)
+    val keepAliveTime = livyConf.getTimeAsMs(
+      LivyConf.SESSION_MANAGE_KEEPALIVE_TIME) / 1000
+    debug(s"Background session manage executors with size=${poolSize}," +
+      s" wait queue size= ${poolQueueSize}, keepalive time ${keepAliveTime} 
seconds")
+    val queue = new LinkedBlockingQueue[Runnable](poolQueueSize)
+    val executor = new ThreadPoolExecutor(
+      poolSize,
+      poolSize,
+      keepAliveTime,
+      TimeUnit.SECONDS,
+      queue,
+      new NamedThreadFactory("LivyServer2-SessionManageExecutors"))
+    executor.allowCoreThreadTimeOut(true)
+    ExecutionContext.fromExecutorService(executor)
+  }
+
   protected implicit val executionContext = ExecutionContext.global
 
   // validate session name. The name should not be a number
@@ -202,7 +232,7 @@ abstract class Session(
 
   def start(): Unit
 
-  def stop(): Future[Unit] = Future {
+  def stop(): Future[AnyVal] = Future {
     try {
       info(s"Stopping $this...")
       stopSession()
@@ -228,7 +258,7 @@ abstract class Session(
       case e: Exception =>
         warn(s"Error cleaning up session $id staging dir.", e)
     }
-  }
+  }(sessionManageExecutors)
 
 
   override def toString(): String = s"${this.getClass.getSimpleName} $id"

Reply via email to