This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 3f74e8bf8 [KYUUBI #4847] Close the session immediately when engine 
corrupt
3f74e8bf8 is described below

commit 3f74e8bf840ce68542da4e412ee202bdb532a206
Author: huangzhir <[email protected]>
AuthorDate: Sun Jun 11 13:02:56 2023 +0800

    [KYUUBI #4847] Close the session immediately when engine corrupt
    
    ### _Why are the changes needed?_
    
    to close https://github.com/apache/kyuubi/issues/4847
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [X] [Run 
test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests)
 locally before make a pull request
    
    Closes #4848 from huangzhir/kyuubisession_leak.
    
    Closes #4847
    
    37e58ce66 [Cheng Pan] Update 
kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
    170c044f5 [huangzhir] Add some logging and modify some code style.
    0c393cd9d [huangzhir] fix sytle
    d0183298e [huangzhir] "Use the ENGINE_ALIVE_PROBE_ENABLED configuration to 
enable the Kyuubi session engine alive probe. The default value of 
ENGINE_ALIVE_PROBE_ENABLED is false. Use the ENGINE_ALIVE_TIMEOUT configuration 
to determine the duration for checking the engine's alive status. The 
engineAliveMaxFailCount configuration controls the maximum number of failures 
allowed during engine alive checks."
    b716dd8f6 [huangzhir] fix kyuubi session leak caused by engine stop
    
    Lead-authored-by: huangzhir <[email protected]>
    Co-authored-by: Cheng Pan <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
---
 .../apache/kyuubi/session/KyuubiSessionImpl.scala  | 38 +++++++++++++
 .../kyuubi/session/KyuubiSessionManager.scala      | 28 +++++++++-
 .../server/api/v1/SessionsResourceSuite.scala      | 65 +++++++++++++++++++++-
 .../server/rest/client/SessionRestApiSuite.scala   | 34 +++++++++++
 4 files changed, 163 insertions(+), 2 deletions(-)

diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
index f72fcadd0..237eb3ca6 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
@@ -114,6 +114,7 @@ class KyuubiSessionImpl(
     super.open()
 
     runOperation(launchEngineOp)
+    engineLastAlive = System.currentTimeMillis()
   }
 
   private[kyuubi] def openEngineSession(extraEngineLog: Option[OperationLog] = 
None): Unit =
@@ -283,4 +284,41 @@ class KyuubiSessionImpl(
       case _ => super.executeStatement(statement, confOverlay, runAsync, 
queryTimeout)
     }
   }
+
+  @volatile private var engineLastAlive: Long = _
+  val engineAliveTimeout = sessionConf.get(KyuubiConf.ENGINE_ALIVE_TIMEOUT)
+  val aliveProbeEnabled = 
sessionConf.get(KyuubiConf.ENGINE_ALIVE_PROBE_ENABLED)
+  var engineAliveMaxFailCount = 3
+  var engineAliveFailCount = 0
+
+  def checkEngineAlive(): Boolean = {
+    try {
+      if (!aliveProbeEnabled) return true
+      getInfo(TGetInfoType.CLI_DBMS_VER)
+      engineLastAlive = System.currentTimeMillis()
+      engineAliveFailCount = 0
+      true
+    } catch {
+      case e: Throwable =>
+        val now = System.currentTimeMillis()
+        engineAliveFailCount = engineAliveFailCount + 1
+        if (now - engineLastAlive > engineAliveTimeout &&
+          engineAliveFailCount >= engineAliveMaxFailCount) {
+          error(s"The engineRef[${engine.getEngineRefId}] is marked as not 
alive "
+            + s"due to a lack of recent successful alive probes. "
+            + s"The time since last successful probe: "
+            + s"${now - engineLastAlive} ms exceeds the timeout of 
$engineAliveTimeout ms. "
+            + s"The engine has failed $engineAliveFailCount times, "
+            + s"surpassing the maximum failure count of 
$engineAliveMaxFailCount.")
+          false
+        } else {
+          warn(
+            s"The engineRef[${engine.getEngineRefId}] alive probe fails, " +
+              s"${now - engineLastAlive} ms exceeds timeout 
$engineAliveTimeout ms, " +
+              s"and has failed $engineAliveFailCount times.",
+            e)
+          true
+        }
+    }
+  }
 }
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
index 0ef3f1ac1..d5504ed19 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
@@ -17,6 +17,8 @@
 
 package org.apache.kyuubi.session
 
+import java.util.concurrent.TimeUnit
+
 import scala.collection.JavaConverters._
 
 import com.codahale.metrics.MetricRegistry
@@ -36,7 +38,7 @@ import org.apache.kyuubi.plugin.{GroupProvider, PluginLoader, 
SessionConfAdvisor
 import org.apache.kyuubi.server.metadata.{MetadataManager, 
MetadataRequestsRetryRef}
 import org.apache.kyuubi.server.metadata.api.Metadata
 import org.apache.kyuubi.sql.parser.server.KyuubiParser
-import org.apache.kyuubi.util.SignUtils
+import org.apache.kyuubi.util.{SignUtils, ThreadUtils}
 
 class KyuubiSessionManager private (name: String) extends SessionManager(name) 
{
 
@@ -61,6 +63,9 @@ class KyuubiSessionManager private (name: String) extends 
SessionManager(name) {
   private var batchLimiter: Option[SessionLimiter] = None
   lazy val (signingPrivateKey, signingPublicKey) = SignUtils.generateKeyPair()
 
+  private val engineAliveChecker =
+    
ThreadUtils.newDaemonSingleThreadScheduledExecutor(s"$name-engine-alive-checker")
+
   override def initialize(conf: KyuubiConf): Unit = {
     this.conf = conf
     addService(applicationManager)
@@ -265,6 +270,7 @@ class KyuubiSessionManager private (name: String) extends 
SessionManager(name) {
       ms.registerGauge(EXEC_POOL_WORK_QUEUE_SIZE, getWorkQueueSize, 0)
     }
     super.start()
+    startEngineAliveChecker()
   }
 
   def getBatchSessionsToRecover(kyuubiInstance: String): 
Seq[KyuubiBatchSessionImpl] = {
@@ -339,4 +345,24 @@ class KyuubiSessionManager private (name: String) extends 
SessionManager(name) {
     Seq(userLimit, ipAddressLimit, userIpAddressLimit).find(_ > 0).map(_ =>
       SessionLimiter(userLimit, ipAddressLimit, userIpAddressLimit, 
userUnlimitedList.toSet))
   }
+
+  private def startEngineAliveChecker(): Unit = {
+    val interval = conf.get(KyuubiConf.ENGINE_ALIVE_PROBE_INTERVAL)
+    val checkTask: Runnable = () => {
+      allSessions.foreach { session =>
+        if (!session.asInstanceOf[KyuubiSessionImpl].checkEngineAlive()) {
+          try {
+            closeSession(session.handle)
+            logger.info(s"The session ${session.handle} has been closed " +
+              s"due to engine unresponsiveness (checked by the engine alive 
checker).")
+          } catch {
+            case e: KyuubiSQLException =>
+              warn(s"Error closing session ${session.handle}", e)
+          }
+        }
+      }
+    }
+    engineAliveChecker.scheduleWithFixedDelay(checkTask, interval, interval, 
TimeUnit.MILLISECONDS)
+  }
+
 }
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/SessionsResourceSuite.scala
 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/SessionsResourceSuite.scala
index 07a711de6..b197a489c 100644
--- 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/SessionsResourceSuite.scala
+++ 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/SessionsResourceSuite.scala
@@ -29,7 +29,7 @@ import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
 
 import org.apache.kyuubi.{KyuubiFunSuite, RestFrontendTestHelper}
 import org.apache.kyuubi.client.api.v1.dto
-import org.apache.kyuubi.client.api.v1.dto._
+import org.apache.kyuubi.client.api.v1.dto.{SessionData, _}
 import org.apache.kyuubi.config.KyuubiConf
 import 
org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_CONNECTION_URL_KEY
 import org.apache.kyuubi.engine.ShareLevel
@@ -301,4 +301,67 @@ class SessionsResourceSuite extends KyuubiFunSuite with 
RestFrontendTestHelper {
       assert(sessionEvent.contains("The last 10 line(s) of log are:"))
     }
   }
+
+  test("fix kyuubi session leak caused by engine stop") {
+    // clean up all sessions
+    var response = webTarget.path("api/v1/sessions").request().get()
+    val sessionDataList = response.readEntity(new 
GenericType[List[SessionData]]() {})
+    sessionDataList.foreach(sessionData => {
+      response = 
webTarget.path(s"api/v1/sessions/${sessionData.getIdentifier}")
+        .request().delete()
+      assert(200 == response.getStatus)
+    })
+
+    // open a session
+    val requestObj = new SessionOpenRequest(Map(
+      KyuubiConf.ENGINE_ALIVE_PROBE_ENABLED.key -> "true",
+      KyuubiConf.ENGINE_ALIVE_PROBE_INTERVAL.key -> "5000",
+      KyuubiConf.ENGINE_ALIVE_TIMEOUT.key -> "3000").asJava)
+    response = webTarget.path("api/v1/sessions")
+      .request(MediaType.APPLICATION_JSON_TYPE)
+      .post(Entity.entity(requestObj, MediaType.APPLICATION_JSON_TYPE))
+    val sessionHandle = 
response.readEntity(classOf[SessionHandle]).getIdentifier
+    val pathPrefix = s"api/v1/sessions/$sessionHandle"
+
+    response = webTarget.path("api/v1/sessions/count").request().get()
+    val openedSessionCount = response.readEntity(classOf[SessionOpenCount])
+    assert(openedSessionCount.getOpenSessionCount == 1)
+
+    var statementReq = new StatementRequest(
+      "spark.sql(\"show tables\")",
+      true,
+      3000,
+      Collections.singletonMap(KyuubiConf.OPERATION_LANGUAGE.key, "SCALA"))
+    response = webTarget
+      
.path(s"$pathPrefix/operations/statement").request(MediaType.APPLICATION_JSON_TYPE)
+      .post(Entity.entity(statementReq, MediaType.APPLICATION_JSON_TYPE))
+    assert(200 == response.getStatus)
+    var operationHandle = response.readEntity(classOf[OperationHandle])
+    assert(operationHandle !== null)
+    assert(openedSessionCount.getOpenSessionCount == 1)
+
+    statementReq = new StatementRequest(
+      "spark.close()",
+      true,
+      3000,
+      Collections.singletonMap(KyuubiConf.OPERATION_LANGUAGE.key, "SCALA"))
+    response = webTarget
+      
.path(s"$pathPrefix/operations/statement").request(MediaType.APPLICATION_JSON_TYPE)
+      .post(Entity.entity(statementReq, MediaType.APPLICATION_JSON_TYPE))
+    assert(200 == response.getStatus)
+    operationHandle = response.readEntity(classOf[OperationHandle])
+    assert(operationHandle !== null)
+
+    // Because the engine has stopped (due to spark.close), the Spark session 
is closed.
+    // Therefore, the Kyuubi session count should be 0.
+    eventually(timeout(30.seconds), interval(1000.milliseconds)) {
+      var response = webTarget.path("api/v1/sessions/count").request().get()
+      val openedSessionCount = response.readEntity(classOf[SessionOpenCount])
+      assert(openedSessionCount.getOpenSessionCount == 0)
+
+      response = webTarget.path("api/v1/sessions").request().get()
+      val sessionDataList = response.readEntity(new 
GenericType[List[SessionData]]() {})
+      assert(sessionDataList.isEmpty)
+    }
+  }
 }
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/SessionRestApiSuite.scala
 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/SessionRestApiSuite.scala
index ed116d077..a1f0fc5ee 100644
--- 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/SessionRestApiSuite.scala
+++ 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/SessionRestApiSuite.scala
@@ -18,10 +18,13 @@
 package org.apache.kyuubi.server.rest.client
 
 import java.util
+import java.util.Collections
 
 import scala.collection.JavaConverters._
+import scala.concurrent.duration.DurationInt
 
 import org.apache.hive.service.rpc.thrift.TGetInfoType
+import org.scalatest.concurrent.PatienceConfiguration.Timeout
 
 import org.apache.kyuubi.RestClientTestHelper
 import org.apache.kyuubi.client.{KyuubiRestClient, SessionRestApi}
@@ -163,6 +166,37 @@ class SessionRestApiSuite extends RestClientTestHelper {
     }
   }
 
+  test("fix kyuubi session leak caused by engine stop") {
+    withSessionRestApi { sessionRestApi =>
+      // close all sessions
+      var sessions = sessionRestApi.listSessions().asScala
+      sessions.foreach(session => 
sessionRestApi.closeSession(session.getIdentifier))
+
+      // open new session
+      val sessionOpenRequest = new SessionOpenRequest(Map(
+        KyuubiConf.ENGINE_ALIVE_PROBE_ENABLED.key -> "true",
+        KyuubiConf.ENGINE_ALIVE_PROBE_INTERVAL.key -> "5000",
+        KyuubiConf.ENGINE_ALIVE_TIMEOUT.key -> "3000").asJava)
+      val sessionHandle = sessionRestApi.openSession(sessionOpenRequest)
+
+      // get open session count
+      val sessionCount = sessionRestApi.getOpenSessionCount
+      assert(sessionCount == 1)
+
+      val statementReq = new StatementRequest(
+        "spark.stop()",
+        true,
+        3000,
+        Collections.singletonMap(KyuubiConf.OPERATION_LANGUAGE.key, "SCALA"))
+      sessionRestApi.executeStatement(sessionHandle.getIdentifier.toString, 
statementReq)
+
+      eventually(Timeout(30.seconds), interval(1.seconds)) {
+        assert(sessionRestApi.getOpenSessionCount == 0)
+        assert(sessionRestApi.listSessions().asScala.isEmpty)
+      }
+    }
+  }
+
   def withSessionRestApi[T](f: SessionRestApi => T): T = {
     val basicKyuubiRestClient: KyuubiRestClient =
       KyuubiRestClient.builder(baseUri.toString)

Reply via email to