This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 3f74e8bf8 [KYUUBI #4847] Close the session immediately when engine
corrupt
3f74e8bf8 is described below
commit 3f74e8bf840ce68542da4e412ee202bdb532a206
Author: huangzhir <[email protected]>
AuthorDate: Sun Jun 11 13:02:56 2023 +0800
[KYUUBI #4847] Close the session immediately when engine corrupt
### _Why are the changes needed?_
to close https://github.com/apache/kyuubi/issues/4847
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [X] [Run
test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #4848 from huangzhir/kyuubisession_leak.
Closes #4847
37e58ce66 [Cheng Pan] Update
kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
170c044f5 [huangzhir] Add some logging and modify some code style.
0c393cd9d [huangzhir] fix sytle
d0183298e [huangzhir] "Use the ENGINE_ALIVE_PROBE_ENABLED configuration to
enable the Kyuubi session engine alive probe. The default value of
ENGINE_ALIVE_PROBE_ENABLED is false. Use the ENGINE_ALIVE_TIMEOUT configuration
to determine the duration for checking the engine's alive status. The
engineAliveMaxFailCount configuration controls the maximum number of failures
allowed during engine alive checks."
b716dd8f6 [huangzhir] fix kyuubi session leak caused by engine stop
Lead-authored-by: huangzhir <[email protected]>
Co-authored-by: Cheng Pan <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
.../apache/kyuubi/session/KyuubiSessionImpl.scala | 38 +++++++++++++
.../kyuubi/session/KyuubiSessionManager.scala | 28 +++++++++-
.../server/api/v1/SessionsResourceSuite.scala | 65 +++++++++++++++++++++-
.../server/rest/client/SessionRestApiSuite.scala | 34 +++++++++++
4 files changed, 163 insertions(+), 2 deletions(-)
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
index f72fcadd0..237eb3ca6 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
@@ -114,6 +114,7 @@ class KyuubiSessionImpl(
super.open()
runOperation(launchEngineOp)
+ engineLastAlive = System.currentTimeMillis()
}
private[kyuubi] def openEngineSession(extraEngineLog: Option[OperationLog] =
None): Unit =
@@ -283,4 +284,41 @@ class KyuubiSessionImpl(
case _ => super.executeStatement(statement, confOverlay, runAsync,
queryTimeout)
}
}
+
+ @volatile private var engineLastAlive: Long = _
+ val engineAliveTimeout = sessionConf.get(KyuubiConf.ENGINE_ALIVE_TIMEOUT)
+ val aliveProbeEnabled =
sessionConf.get(KyuubiConf.ENGINE_ALIVE_PROBE_ENABLED)
+ var engineAliveMaxFailCount = 3
+ var engineAliveFailCount = 0
+
+ def checkEngineAlive(): Boolean = {
+ try {
+ if (!aliveProbeEnabled) return true
+ getInfo(TGetInfoType.CLI_DBMS_VER)
+ engineLastAlive = System.currentTimeMillis()
+ engineAliveFailCount = 0
+ true
+ } catch {
+ case e: Throwable =>
+ val now = System.currentTimeMillis()
+ engineAliveFailCount = engineAliveFailCount + 1
+ if (now - engineLastAlive > engineAliveTimeout &&
+ engineAliveFailCount >= engineAliveMaxFailCount) {
+ error(s"The engineRef[${engine.getEngineRefId}] is marked as not
alive "
+ + s"due to a lack of recent successful alive probes. "
+ + s"The time since last successful probe: "
+ + s"${now - engineLastAlive} ms exceeds the timeout of
$engineAliveTimeout ms. "
+ + s"The engine has failed $engineAliveFailCount times, "
+ + s"surpassing the maximum failure count of
$engineAliveMaxFailCount.")
+ false
+ } else {
+ warn(
+ s"The engineRef[${engine.getEngineRefId}] alive probe fails, " +
+ s"${now - engineLastAlive} ms exceeds timeout
$engineAliveTimeout ms, " +
+ s"and has failed $engineAliveFailCount times.",
+ e)
+ true
+ }
+ }
+ }
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
index 0ef3f1ac1..d5504ed19 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
@@ -17,6 +17,8 @@
package org.apache.kyuubi.session
+import java.util.concurrent.TimeUnit
+
import scala.collection.JavaConverters._
import com.codahale.metrics.MetricRegistry
@@ -36,7 +38,7 @@ import org.apache.kyuubi.plugin.{GroupProvider, PluginLoader,
SessionConfAdvisor
import org.apache.kyuubi.server.metadata.{MetadataManager,
MetadataRequestsRetryRef}
import org.apache.kyuubi.server.metadata.api.Metadata
import org.apache.kyuubi.sql.parser.server.KyuubiParser
-import org.apache.kyuubi.util.SignUtils
+import org.apache.kyuubi.util.{SignUtils, ThreadUtils}
class KyuubiSessionManager private (name: String) extends SessionManager(name)
{
@@ -61,6 +63,9 @@ class KyuubiSessionManager private (name: String) extends
SessionManager(name) {
private var batchLimiter: Option[SessionLimiter] = None
lazy val (signingPrivateKey, signingPublicKey) = SignUtils.generateKeyPair()
+ private val engineAliveChecker =
+
ThreadUtils.newDaemonSingleThreadScheduledExecutor(s"$name-engine-alive-checker")
+
override def initialize(conf: KyuubiConf): Unit = {
this.conf = conf
addService(applicationManager)
@@ -265,6 +270,7 @@ class KyuubiSessionManager private (name: String) extends
SessionManager(name) {
ms.registerGauge(EXEC_POOL_WORK_QUEUE_SIZE, getWorkQueueSize, 0)
}
super.start()
+ startEngineAliveChecker()
}
def getBatchSessionsToRecover(kyuubiInstance: String):
Seq[KyuubiBatchSessionImpl] = {
@@ -339,4 +345,24 @@ class KyuubiSessionManager private (name: String) extends
SessionManager(name) {
Seq(userLimit, ipAddressLimit, userIpAddressLimit).find(_ > 0).map(_ =>
SessionLimiter(userLimit, ipAddressLimit, userIpAddressLimit,
userUnlimitedList.toSet))
}
+
+ private def startEngineAliveChecker(): Unit = {
+ val interval = conf.get(KyuubiConf.ENGINE_ALIVE_PROBE_INTERVAL)
+ val checkTask: Runnable = () => {
+ allSessions.foreach { session =>
+ if (!session.asInstanceOf[KyuubiSessionImpl].checkEngineAlive()) {
+ try {
+ closeSession(session.handle)
+ logger.info(s"The session ${session.handle} has been closed " +
+ s"due to engine unresponsiveness (checked by the engine alive
checker).")
+ } catch {
+ case e: KyuubiSQLException =>
+ warn(s"Error closing session ${session.handle}", e)
+ }
+ }
+ }
+ }
+ engineAliveChecker.scheduleWithFixedDelay(checkTask, interval, interval,
TimeUnit.MILLISECONDS)
+ }
+
}
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/SessionsResourceSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/SessionsResourceSuite.scala
index 07a711de6..b197a489c 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/SessionsResourceSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/SessionsResourceSuite.scala
@@ -29,7 +29,7 @@ import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
import org.apache.kyuubi.{KyuubiFunSuite, RestFrontendTestHelper}
import org.apache.kyuubi.client.api.v1.dto
-import org.apache.kyuubi.client.api.v1.dto._
+import org.apache.kyuubi.client.api.v1.dto.{SessionData, _}
import org.apache.kyuubi.config.KyuubiConf
import
org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_CONNECTION_URL_KEY
import org.apache.kyuubi.engine.ShareLevel
@@ -301,4 +301,67 @@ class SessionsResourceSuite extends KyuubiFunSuite with
RestFrontendTestHelper {
assert(sessionEvent.contains("The last 10 line(s) of log are:"))
}
}
+
+ test("fix kyuubi session leak caused by engine stop") {
+ // clean up all sessions
+ var response = webTarget.path("api/v1/sessions").request().get()
+ val sessionDataList = response.readEntity(new
GenericType[List[SessionData]]() {})
+ sessionDataList.foreach(sessionData => {
+ response =
webTarget.path(s"api/v1/sessions/${sessionData.getIdentifier}")
+ .request().delete()
+ assert(200 == response.getStatus)
+ })
+
+ // open a session
+ val requestObj = new SessionOpenRequest(Map(
+ KyuubiConf.ENGINE_ALIVE_PROBE_ENABLED.key -> "true",
+ KyuubiConf.ENGINE_ALIVE_PROBE_INTERVAL.key -> "5000",
+ KyuubiConf.ENGINE_ALIVE_TIMEOUT.key -> "3000").asJava)
+ response = webTarget.path("api/v1/sessions")
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .post(Entity.entity(requestObj, MediaType.APPLICATION_JSON_TYPE))
+ val sessionHandle =
response.readEntity(classOf[SessionHandle]).getIdentifier
+ val pathPrefix = s"api/v1/sessions/$sessionHandle"
+
+ response = webTarget.path("api/v1/sessions/count").request().get()
+ val openedSessionCount = response.readEntity(classOf[SessionOpenCount])
+ assert(openedSessionCount.getOpenSessionCount == 1)
+
+ var statementReq = new StatementRequest(
+ "spark.sql(\"show tables\")",
+ true,
+ 3000,
+ Collections.singletonMap(KyuubiConf.OPERATION_LANGUAGE.key, "SCALA"))
+ response = webTarget
+
.path(s"$pathPrefix/operations/statement").request(MediaType.APPLICATION_JSON_TYPE)
+ .post(Entity.entity(statementReq, MediaType.APPLICATION_JSON_TYPE))
+ assert(200 == response.getStatus)
+ var operationHandle = response.readEntity(classOf[OperationHandle])
+ assert(operationHandle !== null)
+ assert(openedSessionCount.getOpenSessionCount == 1)
+
+ statementReq = new StatementRequest(
+ "spark.close()",
+ true,
+ 3000,
+ Collections.singletonMap(KyuubiConf.OPERATION_LANGUAGE.key, "SCALA"))
+ response = webTarget
+
.path(s"$pathPrefix/operations/statement").request(MediaType.APPLICATION_JSON_TYPE)
+ .post(Entity.entity(statementReq, MediaType.APPLICATION_JSON_TYPE))
+ assert(200 == response.getStatus)
+ operationHandle = response.readEntity(classOf[OperationHandle])
+ assert(operationHandle !== null)
+
+ // Because the engine has stopped (due to spark.close), the Spark session
is closed.
+ // Therefore, the Kyuubi session count should be 0.
+ eventually(timeout(30.seconds), interval(1000.milliseconds)) {
+ var response = webTarget.path("api/v1/sessions/count").request().get()
+ val openedSessionCount = response.readEntity(classOf[SessionOpenCount])
+ assert(openedSessionCount.getOpenSessionCount == 0)
+
+ response = webTarget.path("api/v1/sessions").request().get()
+ val sessionDataList = response.readEntity(new
GenericType[List[SessionData]]() {})
+ assert(sessionDataList.isEmpty)
+ }
+ }
}
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/SessionRestApiSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/SessionRestApiSuite.scala
index ed116d077..a1f0fc5ee 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/SessionRestApiSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/SessionRestApiSuite.scala
@@ -18,10 +18,13 @@
package org.apache.kyuubi.server.rest.client
import java.util
+import java.util.Collections
import scala.collection.JavaConverters._
+import scala.concurrent.duration.DurationInt
import org.apache.hive.service.rpc.thrift.TGetInfoType
+import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.apache.kyuubi.RestClientTestHelper
import org.apache.kyuubi.client.{KyuubiRestClient, SessionRestApi}
@@ -163,6 +166,37 @@ class SessionRestApiSuite extends RestClientTestHelper {
}
}
+ test("fix kyuubi session leak caused by engine stop") {
+ withSessionRestApi { sessionRestApi =>
+ // close all sessions
+ var sessions = sessionRestApi.listSessions().asScala
+ sessions.foreach(session =>
sessionRestApi.closeSession(session.getIdentifier))
+
+ // open new session
+ val sessionOpenRequest = new SessionOpenRequest(Map(
+ KyuubiConf.ENGINE_ALIVE_PROBE_ENABLED.key -> "true",
+ KyuubiConf.ENGINE_ALIVE_PROBE_INTERVAL.key -> "5000",
+ KyuubiConf.ENGINE_ALIVE_TIMEOUT.key -> "3000").asJava)
+ val sessionHandle = sessionRestApi.openSession(sessionOpenRequest)
+
+ // get open session count
+ val sessionCount = sessionRestApi.getOpenSessionCount
+ assert(sessionCount == 1)
+
+ val statementReq = new StatementRequest(
+ "spark.stop()",
+ true,
+ 3000,
+ Collections.singletonMap(KyuubiConf.OPERATION_LANGUAGE.key, "SCALA"))
+ sessionRestApi.executeStatement(sessionHandle.getIdentifier.toString,
statementReq)
+
+ eventually(Timeout(30.seconds), interval(1.seconds)) {
+ assert(sessionRestApi.getOpenSessionCount == 0)
+ assert(sessionRestApi.listSessions().asScala.isEmpty)
+ }
+ }
+ }
+
def withSessionRestApi[T](f: SessionRestApi => T): T = {
val basicKyuubiRestClient: KyuubiRestClient =
KyuubiRestClient.builder(baseUri.toString)