This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ee96b5d0 [KYUUBI #4968] Simplify Option conversion
5ee96b5d0 is described below

commit 5ee96b5d063ec2b3545578c42b19620862461613
Author: Cheng Pan <[email protected]>
AuthorDate: Thu Jun 15 20:53:50 2023 +0800

    [KYUUBI #4968] Simplify Option conversion
    
    ### _Why are the changes needed?_
    
    Remove unnecessary conversion between `value = valueOpt.orNull` and 
`valueOpt = Option(value)`
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run 
test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests)
 locally before make a pull request
    
    Closes #4968 from pan3793/option.
    
    Closes #4968
    
    882181372 [Cheng Pan] fix
    c79a72116 [Cheng Pan] nit
    fdc95f221 [Cheng Pan] nit
    f9e189fc0 [Cheng Pan] nit
    2bdfe9a13 [Cheng Pan] nit
    d02e2cfa1 [Cheng Pan] nit
    ac8b5fb5d [Cheng Pan] Simplify Option convertion
    
    Authored-by: Cheng Pan <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
---
 .../kyuubi/operation/BatchJobSubmission.scala      |  4 +-
 .../kyuubi/operation/KyuubiOperationManager.scala  |  4 +-
 .../kyuubi/server/KyuubiRestFrontendService.scala  |  2 +-
 .../kyuubi/server/api/v1/BatchesResource.scala     | 78 ++++++++++------------
 .../kyuubi/server/metadata/MetadataManager.scala   | 10 +--
 ...hSessionImpl.scala => KyuubiBatchSession.scala} |  2 +-
 .../kyuubi/session/KyuubiSessionManager.scala      | 32 ++++-----
 .../org/apache/kyuubi/WithKyuubiServerOnYarn.scala | 10 +--
 .../server/api/v1/BatchesResourceSuite.scala       | 16 ++---
 .../server/metadata/MetadataManagerSuite.scala     |  4 +-
 10 files changed, 78 insertions(+), 84 deletions(-)

diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
index e58539ff7..ab2cfa302 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
@@ -36,7 +36,7 @@ import 
org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
 import org.apache.kyuubi.operation.OperationState.{isTerminal, CANCELED, 
OperationState, RUNNING}
 import org.apache.kyuubi.operation.log.OperationLog
 import org.apache.kyuubi.server.metadata.api.Metadata
-import org.apache.kyuubi.session.KyuubiBatchSessionImpl
+import org.apache.kyuubi.session.KyuubiBatchSession
 
 /**
  * The state of batch operation is special. In general, the lifecycle of state 
is:
@@ -51,7 +51,7 @@ import org.apache.kyuubi.session.KyuubiBatchSessionImpl
  * user close the batch session that means the final status is CANCELED.
  */
 class BatchJobSubmission(
-    session: KyuubiBatchSessionImpl,
+    session: KyuubiBatchSession,
     val batchType: String,
     val batchName: String,
     resource: String,
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
index 4730d1618..3078401b0 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
@@ -28,7 +28,7 @@ import 
org.apache.kyuubi.metrics.MetricsConstants.OPERATION_OPEN
 import org.apache.kyuubi.metrics.MetricsSystem
 import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
 import org.apache.kyuubi.server.metadata.api.Metadata
-import org.apache.kyuubi.session.{KyuubiBatchSessionImpl, KyuubiSessionImpl, 
Session}
+import org.apache.kyuubi.session.{KyuubiBatchSession, KyuubiSessionImpl, 
Session}
 import org.apache.kyuubi.sql.plan.command.RunnableCommand
 import org.apache.kyuubi.util.ThriftUtils
 
@@ -74,7 +74,7 @@ class KyuubiOperationManager private (name: String) extends 
OperationManager(nam
   }
 
   def newBatchJobSubmissionOperation(
-      session: KyuubiBatchSessionImpl,
+      session: KyuubiBatchSession,
       batchType: String,
       batchName: String,
       resource: String,
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala
index 86cb28aed..5b6eb0408 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala
@@ -123,7 +123,7 @@ class KyuubiRestFrontendService(override val serverable: 
Serverable)
           
sessionManager.getPeerInstanceClosedBatchSessions(connectionUrl).foreach { 
batch =>
             Utils.tryLogNonFatalError {
               val sessionHandle = SessionHandle.fromUUID(batch.identifier)
-              
Option(sessionManager.getBatchSessionImpl(sessionHandle)).foreach(_.close())
+              sessionManager.getBatchSession(sessionHandle).foreach(_.close())
             }
           }
         } catch {
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
index 11c36c757..8271c8b88 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
@@ -38,7 +38,7 @@ import org.apache.kyuubi.{Logging, Utils}
 import org.apache.kyuubi.client.api.v1.dto._
 import org.apache.kyuubi.client.exception.KyuubiRestException
 import org.apache.kyuubi.client.util.BatchUtils._
-import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.KyuubiConf._
 import org.apache.kyuubi.config.KyuubiReservedKeys._
 import org.apache.kyuubi.engine.{ApplicationInfo, KyuubiApplicationManager}
 import org.apache.kyuubi.operation.{BatchJobSubmission, FetchOrientation, 
OperationState}
@@ -46,7 +46,7 @@ import org.apache.kyuubi.server.api.ApiRequestContext
 import org.apache.kyuubi.server.api.v1.BatchesResource._
 import org.apache.kyuubi.server.metadata.MetadataManager
 import org.apache.kyuubi.server.metadata.api.Metadata
-import org.apache.kyuubi.session.{KyuubiBatchSessionImpl, 
KyuubiSessionManager, SessionHandle}
+import org.apache.kyuubi.session.{KyuubiBatchSession, KyuubiSessionManager, 
SessionHandle}
 import org.apache.kyuubi.util.JdbcUtils
 
 @Tag(name = "Batch")
@@ -54,45 +54,33 @@ import org.apache.kyuubi.util.JdbcUtils
 private[v1] class BatchesResource extends ApiRequestContext with Logging {
   private val internalRestClients = new ConcurrentHashMap[String, 
InternalRestClient]()
   private lazy val internalSocketTimeout =
-    fe.getConf.get(KyuubiConf.BATCH_INTERNAL_REST_CLIENT_SOCKET_TIMEOUT)
+    fe.getConf.get(BATCH_INTERNAL_REST_CLIENT_SOCKET_TIMEOUT).toInt
   private lazy val internalConnectTimeout =
-    fe.getConf.get(KyuubiConf.BATCH_INTERNAL_REST_CLIENT_CONNECT_TIMEOUT)
+    fe.getConf.get(BATCH_INTERNAL_REST_CLIENT_CONNECT_TIMEOUT).toInt
 
   private def getInternalRestClient(kyuubiInstance: String): 
InternalRestClient = {
     internalRestClients.computeIfAbsent(
       kyuubiInstance,
-      kyuubiInstance => {
-        new InternalRestClient(
-          kyuubiInstance,
-          internalSocketTimeout.toInt,
-          internalConnectTimeout.toInt)
-      })
+      k => new InternalRestClient(k, internalSocketTimeout, 
internalConnectTimeout))
   }
 
   private def sessionManager = 
fe.be.sessionManager.asInstanceOf[KyuubiSessionManager]
 
-  private def buildBatch(session: KyuubiBatchSessionImpl): Batch = {
+  private def buildBatch(session: KyuubiBatchSession): Batch = {
     val batchOp = session.batchJobSubmissionOp
     val batchOpStatus = batchOp.getStatus
-    val batchAppStatus = batchOp.getApplicationInfo
 
-    val name = 
Option(batchOp.batchName).getOrElse(batchAppStatus.map(_.name).orNull)
-    var appId: String = null
-    var appUrl: String = null
-    var appState: String = null
-    var appDiagnostic: String = null
-
-    if (!OperationState.isTerminal(batchOpStatus.state) && 
batchAppStatus.nonEmpty) {
-      appId = batchAppStatus.get.id
-      appUrl = batchAppStatus.get.url.orNull
-      appState = batchAppStatus.get.state.toString
-      appDiagnostic = batchAppStatus.get.error.orNull
-    } else {
-      val metadata = sessionManager.getBatchMetadata(batchOp.batchId)
-      appId = metadata.engineId
-      appUrl = metadata.engineUrl
-      appState = metadata.engineState
-      appDiagnostic = metadata.engineError.orNull
+    val (name, appId, appUrl, appState, appDiagnostic) = 
batchOp.getApplicationInfo.map { appInfo =>
+      val name = Option(batchOp.batchName).getOrElse(appInfo.name)
+      (name, appInfo.id, appInfo.url.orNull, appInfo.state.toString, 
appInfo.error.orNull)
+    }.getOrElse {
+      sessionManager.getBatchMetadata(batchOp.batchId) match {
+        case Some(batch) =>
+          val diagnostic = batch.engineError.orNull
+          (batchOp.batchName, batch.engineId, batch.engineUrl, 
batch.engineState, diagnostic)
+        case None =>
+          (batchOp.batchName, null, null, null, null)
+      }
     }
 
     new Batch(
@@ -185,8 +173,8 @@ private[v1] class BatchesResource extends ApiRequestContext 
with Logging {
       @FormDataParam("resourceFile") resourceFileInputStream: InputStream,
       @FormDataParam("resourceFile") resourceFileMetadata: 
FormDataContentDisposition): Batch = {
     require(
-      fe.getConf.get(KyuubiConf.BATCH_RESOURCE_UPLOAD_ENABLED),
-      "Batch resource upload function is not enabled.")
+      fe.getConf.get(BATCH_RESOURCE_UPLOAD_ENABLED),
+      "Batch resource upload function is disabled.")
     require(
       batchRequest != null,
       "batchRequest is required and please check the content type" +
@@ -228,7 +216,7 @@ private[v1] class BatchesResource extends ApiRequestContext 
with Logging {
     }
 
     userProvidedBatchId.flatMap { batchId =>
-      Option(sessionManager.getBatchFromMetadataStore(batchId))
+      sessionManager.getBatchFromMetadataStore(batchId)
     } match {
       case Some(batch) =>
         markDuplicated(batch)
@@ -254,11 +242,17 @@ private[v1] class BatchesResource extends 
ApiRequestContext with Logging {
             request)
         } match {
           case Success(sessionHandle) =>
-            buildBatch(sessionManager.getBatchSessionImpl(sessionHandle))
+            sessionManager.getBatchSession(sessionHandle) match {
+              case Some(batchSession) => buildBatch(batchSession)
+              case None => throw new IllegalStateException(
+                  s"can not find batch $batchId from metadata store")
+            }
           case Failure(cause) if JdbcUtils.isDuplicatedKeyDBErr(cause) =>
-            val batch = sessionManager.getBatchFromMetadataStore(batchId)
-            assert(batch != null, s"can not find duplicated batch $batchId 
from metadata store")
-            markDuplicated(batch)
+            sessionManager.getBatchFromMetadataStore(batchId) match {
+              case Some(batch) => markDuplicated(batch)
+              case None => throw new IllegalStateException(
+                  s"can not find duplicated batch $batchId from metadata 
store")
+            }
         }
     }
   }
@@ -280,10 +274,10 @@ private[v1] class BatchesResource extends 
ApiRequestContext with Logging {
   def batchInfo(@PathParam("batchId") batchId: String): Batch = {
     val userName = fe.getSessionUser(Map.empty[String, String])
     val sessionHandle = formatSessionHandle(batchId)
-    Option(sessionManager.getBatchSessionImpl(sessionHandle)).map { 
batchSession =>
+    sessionManager.getBatchSession(sessionHandle).map { batchSession =>
       buildBatch(batchSession)
     }.getOrElse {
-      Option(sessionManager.getBatchMetadata(batchId)).map { metadata =>
+      sessionManager.getBatchMetadata(batchId).map { metadata =>
         if (OperationState.isTerminal(OperationState.withName(metadata.state)) 
||
           metadata.kyuubiInstance == fe.connectionUrl) {
           MetadataManager.buildBatch(metadata)
@@ -359,7 +353,7 @@ private[v1] class BatchesResource extends ApiRequestContext 
with Logging {
       @QueryParam("size") @DefaultValue("100") size: Int): OperationLog = {
     val userName = fe.getSessionUser(Map.empty[String, String])
     val sessionHandle = formatSessionHandle(batchId)
-    Option(sessionManager.getBatchSessionImpl(sessionHandle)).map { 
batchSession =>
+    sessionManager.getBatchSession(sessionHandle).map { batchSession =>
       try {
         val submissionOp = batchSession.batchJobSubmissionOp
         val rowSet = 
submissionOp.getOperationLogRowSet(FetchOrientation.FETCH_NEXT, from, size)
@@ -379,7 +373,7 @@ private[v1] class BatchesResource extends ApiRequestContext 
with Logging {
           throw new NotFoundException(errorMsg)
       }
     }.getOrElse {
-      Option(sessionManager.getBatchMetadata(batchId)).map { metadata =>
+      sessionManager.getBatchMetadata(batchId).map { metadata =>
         if (fe.connectionUrl != metadata.kyuubiInstance) {
           val internalRestClient = 
getInternalRestClient(metadata.kyuubiInstance)
           internalRestClient.getBatchLocalLog(userName, batchId, from, size)
@@ -408,7 +402,7 @@ private[v1] class BatchesResource extends ApiRequestContext 
with Logging {
 
     val userName = fe.getSessionUser(hs2ProxyUser)
 
-    Option(sessionManager.getBatchSessionImpl(sessionHandle)).map { 
batchSession =>
+    sessionManager.getBatchSession(sessionHandle).map { batchSession =>
       if (userName != batchSession.user) {
         throw new WebApplicationException(
           s"$userName is not allowed to close the session belong to 
${batchSession.user}",
@@ -418,7 +412,7 @@ private[v1] class BatchesResource extends ApiRequestContext 
with Logging {
       val (success, msg) = batchSession.batchJobSubmissionOp.getKillMessage
       new CloseBatchResponse(success, msg)
     }.getOrElse {
-      Option(sessionManager.getBatchMetadata(batchId)).map { metadata =>
+      sessionManager.getBatchMetadata(batchId).map { metadata =>
         if (userName != metadata.username) {
           throw new WebApplicationException(
             s"$userName is not allowed to close the session belong to 
${metadata.username}",
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataManager.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataManager.scala
index 88a7f4e4e..17beeb62a 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataManager.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataManager.scala
@@ -124,13 +124,13 @@ class MetadataManager extends 
AbstractService("MetadataManager") {
     }
   }
 
-  def getBatch(batchId: String): Batch = {
-    Option(getBatchSessionMetadata(batchId)).map(buildBatch).orNull
+  def getBatch(batchId: String): Option[Batch] = {
+    getBatchSessionMetadata(batchId).map(buildBatch)
   }
 
-  def getBatchSessionMetadata(batchId: String): Metadata = {
-    Option(withMetadataRequestMetrics(_metadataStore.getMetadata(batchId, 
true))).filter(
-      _.sessionType == SessionType.BATCH).orNull
+  def getBatchSessionMetadata(batchId: String): Option[Metadata] = {
+    Option(withMetadataRequestMetrics(_metadataStore.getMetadata(batchId, 
true)))
+      .filter(_.sessionType == SessionType.BATCH)
   }
 
   def getBatches(
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala
similarity index 99%
rename from 
kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala
rename to 
kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala
index ba2046829..ded4c8bf4 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala
@@ -30,7 +30,7 @@ import org.apache.kyuubi.operation.OperationState
 import org.apache.kyuubi.server.metadata.api.Metadata
 import org.apache.kyuubi.session.SessionType.SessionType
 
-class KyuubiBatchSessionImpl(
+class KyuubiBatchSession(
     user: String,
     password: String,
     ipAddress: String,
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
index d5504ed19..6e74bedbf 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
@@ -121,7 +121,7 @@ class KyuubiSessionManager private (name: String) extends 
SessionManager(name) {
       super.closeSession(sessionHandle)
     } finally {
       session match {
-        case _: KyuubiBatchSessionImpl =>
+        case _: KyuubiBatchSession =>
           batchLimiter.foreach(_.decrement(UserIpAddress(session.user, 
session.ipAddress)))
         case _ =>
           limiter.foreach(_.decrement(UserIpAddress(session.user, 
session.ipAddress)))
@@ -142,11 +142,11 @@ class KyuubiSessionManager private (name: String) extends 
SessionManager(name) {
       batchConf: Map[String, String],
       batchArgs: Seq[String],
       recoveryMetadata: Option[Metadata] = None,
-      shouldRunAsync: Boolean): KyuubiBatchSessionImpl = {
+      shouldRunAsync: Boolean): KyuubiBatchSession = {
     // scalastyle:on
     val username = Option(user).filter(_.nonEmpty).getOrElse("anonymous")
     val sessionConf = this.getConf.getUserDefaults(user)
-    new KyuubiBatchSessionImpl(
+    new KyuubiBatchSession(
       username,
       password,
       ipAddress,
@@ -163,7 +163,7 @@ class KyuubiSessionManager private (name: String) extends 
SessionManager(name) {
       shouldRunAsync)
   }
 
-  private[kyuubi] def openBatchSession(batchSession: KyuubiBatchSessionImpl): 
SessionHandle = {
+  private[kyuubi] def openBatchSession(batchSession: KyuubiBatchSession): 
SessionHandle = {
     val user = batchSession.user
     val ipAddress = batchSession.ipAddress
     batchLimiter.foreach(_.increment(UserIpAddress(user, ipAddress)))
@@ -216,8 +216,8 @@ class KyuubiSessionManager private (name: String) extends 
SessionManager(name) {
     openBatchSession(batchSession)
   }
 
-  def getBatchSessionImpl(sessionHandle: SessionHandle): 
KyuubiBatchSessionImpl = {
-    
getSessionOption(sessionHandle).map(_.asInstanceOf[KyuubiBatchSessionImpl]).orNull
+  def getBatchSession(sessionHandle: SessionHandle): 
Option[KyuubiBatchSession] = {
+    getSessionOption(sessionHandle).map(_.asInstanceOf[KyuubiBatchSession])
   }
 
   def insertMetadata(metadata: Metadata): Unit = {
@@ -229,15 +229,15 @@ class KyuubiSessionManager private (name: String) extends 
SessionManager(name) {
   }
 
   def getMetadataRequestsRetryRef(identifier: String): 
Option[MetadataRequestsRetryRef] = {
-    
Option(metadataManager.map(_.getMetadataRequestsRetryRef(identifier)).orNull)
+    metadataManager.flatMap(mm => 
Option(mm.getMetadataRequestsRetryRef(identifier)))
   }
 
   def deRegisterMetadataRequestsRetryRef(identifier: String): Unit = {
     metadataManager.foreach(_.deRegisterRequestsRetryRef(identifier))
   }
 
-  def getBatchFromMetadataStore(batchId: String): Batch = {
-    metadataManager.map(_.getBatch(batchId)).orNull
+  def getBatchFromMetadataStore(batchId: String): Option[Batch] = {
+    metadataManager.flatMap(mm => mm.getBatch(batchId))
   }
 
   def getBatchesFromMetadataStore(
@@ -248,13 +248,13 @@ class KyuubiSessionManager private (name: String) extends 
SessionManager(name) {
       endTime: Long,
       from: Int,
       size: Int): Seq[Batch] = {
-    metadataManager.map(
-      _.getBatches(batchType, batchUser, batchState, createTime, endTime, 
from, size))
-      .getOrElse(Seq.empty)
+    metadataManager.map { mm =>
+      mm.getBatches(batchType, batchUser, batchState, createTime, endTime, 
from, size)
+    }.getOrElse(Seq.empty)
   }
 
-  def getBatchMetadata(batchId: String): Metadata = {
-    metadataManager.map(_.getBatchSessionMetadata(batchId)).orNull
+  def getBatchMetadata(batchId: String): Option[Metadata] = {
+    metadataManager.flatMap(_.getBatchSessionMetadata(batchId))
   }
 
   @VisibleForTesting
@@ -273,7 +273,7 @@ class KyuubiSessionManager private (name: String) extends 
SessionManager(name) {
     startEngineAliveChecker()
   }
 
-  def getBatchSessionsToRecover(kyuubiInstance: String): 
Seq[KyuubiBatchSessionImpl] = {
+  def getBatchSessionsToRecover(kyuubiInstance: String): 
Seq[KyuubiBatchSession] = {
     Seq(OperationState.PENDING, OperationState.RUNNING).flatMap { 
stateToRecover =>
       metadataManager.map(_.getBatchesRecoveryMetadata(
         stateToRecover.toString,
@@ -349,7 +349,7 @@ class KyuubiSessionManager private (name: String) extends 
SessionManager(name) {
   private def startEngineAliveChecker(): Unit = {
     val interval = conf.get(KyuubiConf.ENGINE_ALIVE_PROBE_INTERVAL)
     val checkTask: Runnable = () => {
-      allSessions.foreach { session =>
+      allSessions().foreach { session =>
         if (!session.asInstanceOf[KyuubiSessionImpl].checkEngineAlive()) {
           try {
             closeSession(session.handle)
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala
index f2ec60fea..2ed413dcb 100644
--- 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala
+++ 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala
@@ -31,7 +31,7 @@ import org.apache.kyuubi.engine.ApplicationState._
 import org.apache.kyuubi.operation.{FetchOrientation, HiveJDBCTestHelper, 
OperationState}
 import org.apache.kyuubi.operation.OperationState.ERROR
 import org.apache.kyuubi.server.MiniYarnService
-import org.apache.kyuubi.session.{KyuubiBatchSessionImpl, KyuubiSessionManager}
+import org.apache.kyuubi.session.{KyuubiBatchSession, KyuubiSessionManager}
 
 /**
  * To developers:
@@ -119,7 +119,7 @@ class KyuubiOperationYarnClusterSuite extends 
WithKyuubiServerOnYarn with HiveJD
       batchRequest.getConf.asScala.toMap,
       batchRequest)
 
-    val session = 
sessionManager.getSession(sessionHandle).asInstanceOf[KyuubiBatchSessionImpl]
+    val session = 
sessionManager.getSession(sessionHandle).asInstanceOf[KyuubiBatchSession]
     val batchJobSubmissionOp = session.batchJobSubmissionOp
 
     eventually(timeout(3.minutes), interval(50.milliseconds)) {
@@ -130,8 +130,8 @@ class KyuubiOperationYarnClusterSuite extends 
WithKyuubiServerOnYarn with HiveJD
 
     eventually(timeout(10.seconds)) {
       val metadata = 
session.sessionManager.getBatchMetadata(session.handle.identifier.toString)
-      assert(metadata.state === "RUNNING")
-      assert(metadata.engineId.startsWith("application_"))
+      assert(metadata.map(_.state).contains("RUNNING"))
+      assert(metadata.map(_.engineId).get.startsWith("application_"))
     }
 
     val killResponse = 
yarnOperation.killApplicationByTag(sessionHandle.identifier.toString)
@@ -179,7 +179,7 @@ class KyuubiOperationYarnClusterSuite extends 
WithKyuubiServerOnYarn with HiveJD
       batchRequest.getConf.asScala.toMap,
       batchRequest)
 
-    val session = 
sessionManager.getSession(sessionHandle).asInstanceOf[KyuubiBatchSessionImpl]
+    val session = 
sessionManager.getSession(sessionHandle).asInstanceOf[KyuubiBatchSession]
     val batchJobSubmissionOp = session.batchJobSubmissionOp
 
     eventually(timeout(3.minutes), interval(50.milliseconds)) {
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
index dc59d75e3..62a7fec13 100644
--- 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
+++ 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
@@ -46,7 +46,7 @@ import org.apache.kyuubi.server.KyuubiRestFrontendService
 import 
org.apache.kyuubi.server.http.authentication.AuthenticationHandler.AUTHORIZATION_HEADER
 import org.apache.kyuubi.server.metadata.api.Metadata
 import org.apache.kyuubi.service.authentication.KyuubiAuthenticationFactory
-import org.apache.kyuubi.session.{KyuubiBatchSessionImpl, 
KyuubiSessionManager, SessionHandle, SessionType}
+import org.apache.kyuubi.session.{KyuubiBatchSession, KyuubiSessionManager, 
SessionHandle, SessionType}
 
 class BatchesResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper 
with BatchTestHelper {
   override protected lazy val conf: KyuubiConf = KyuubiConf()
@@ -464,8 +464,8 @@ class BatchesResourceSuite extends KyuubiFunSuite with 
RestFrontendTestHelper wi
     sessionManager.insertMetadata(batchMetadata)
     sessionManager.insertMetadata(batchMetadata2)
 
-    
assert(sessionManager.getBatchFromMetadataStore(batchId1).getState.equals("PENDING"))
-    
assert(sessionManager.getBatchFromMetadataStore(batchId2).getState.equals("PENDING"))
+    
assert(sessionManager.getBatchFromMetadataStore(batchId1).map(_.getState).contains("PENDING"))
+    
assert(sessionManager.getBatchFromMetadataStore(batchId2).map(_.getState).contains("PENDING"))
 
     val sparkBatchProcessBuilder = new SparkBatchProcessBuilder(
       "kyuubi",
@@ -501,8 +501,8 @@ class BatchesResourceSuite extends KyuubiFunSuite with 
RestFrontendTestHelper wi
 
     val sessionHandle1 = SessionHandle.fromUUID(batchId1)
     val sessionHandle2 = SessionHandle.fromUUID(batchId2)
-    val session1 = 
sessionManager.getSession(sessionHandle1).asInstanceOf[KyuubiBatchSessionImpl]
-    val session2 = 
sessionManager.getSession(sessionHandle2).asInstanceOf[KyuubiBatchSessionImpl]
+    val session1 = 
sessionManager.getSession(sessionHandle1).asInstanceOf[KyuubiBatchSession]
+    val session2 = 
sessionManager.getSession(sessionHandle2).asInstanceOf[KyuubiBatchSession]
     assert(session1.createTime === batchMetadata.createTime)
     assert(session2.createTime === batchMetadata2.createTime)
 
@@ -638,8 +638,8 @@ class BatchesResourceSuite extends KyuubiFunSuite with 
RestFrontendTestHelper wi
       .post(Entity.entity(requestObj, MediaType.APPLICATION_JSON_TYPE))
     assert(200 == response.getStatus)
     val batch = response.readEntity(classOf[Batch])
-    val batchSession = 
sessionManager.getBatchSessionImpl(SessionHandle.fromUUID(batch.getId))
-    assert(batchSession.ipAddress === realClientIp)
+    val batchSession = 
sessionManager.getBatchSession(SessionHandle.fromUUID(batch.getId))
+    assert(batchSession.map(_.ipAddress).contains(realClientIp))
   }
 
   test("expose the metrics with operation type and current state") {
@@ -708,6 +708,6 @@ class BatchesResourceSuite extends KyuubiFunSuite with 
RestFrontendTestHelper wi
     val sessionHandleRegex = "\\[[\\S]*\\]".r
     val batchId = 
sessionHandleRegex.findFirstMatchIn(e.getMessage).get.group(0)
       .replaceAll("\\[", "").replaceAll("\\]", "")
-    assert(sessionManager.getBatchMetadata(batchId).state == "CANCELED")
+    
assert(sessionManager.getBatchMetadata(batchId).map(_.state).contains("CANCELED"))
   }
 }
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/MetadataManagerSuite.scala
 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/MetadataManagerSuite.scala
index 75c935a3d..8064b7f1f 100644
--- 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/MetadataManagerSuite.scala
+++ 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/MetadataManagerSuite.scala
@@ -66,7 +66,7 @@ class MetadataManagerSuite extends KyuubiFunSuite {
       retryRef.addRetryingMetadataRequest(UpdateMetadata(metadataToUpdate))
       eventually(timeout(3.seconds)) {
         assert(retryRef.hasRemainingRequests())
-        assert(metadataManager.getBatch(metadata.identifier).getState === 
"PENDING")
+        
assert(metadataManager.getBatch(metadata.identifier).map(_.getState).contains("PENDING"))
       }
 
       val metadata2 = metadata.copy(identifier = UUID.randomUUID().toString)
@@ -84,7 +84,7 @@ class MetadataManagerSuite extends KyuubiFunSuite {
 
       eventually(timeout(3.seconds)) {
         assert(!retryRef2.hasRemainingRequests())
-        assert(metadataManager.getBatch(metadata2.identifier).getState === 
"RUNNING")
+        
assert(metadataManager.getBatch(metadata2.identifier).map(_.getState).contains("RUNNING"))
       }
 
       metadataManager.identifierRequestsAsyncRetryRefs.clear()

Reply via email to