This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 5ee96b5d0 [KYUUBI #4968] Simplify Option conversion
5ee96b5d0 is described below
commit 5ee96b5d063ec2b3545578c42b19620862461613
Author: Cheng Pan <[email protected]>
AuthorDate: Thu Jun 15 20:53:50 2023 +0800
[KYUUBI #4968] Simplify Option conversion
### _Why are the changes needed?_
Remove unnecessary conversion between `value = valueOpt.orNull` and
`valueOpt = Option(value)`
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run
test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #4968 from pan3793/option.
Closes #4968
882181372 [Cheng Pan] fix
c79a72116 [Cheng Pan] nit
fdc95f221 [Cheng Pan] nit
f9e189fc0 [Cheng Pan] nit
2bdfe9a13 [Cheng Pan] nit
d02e2cfa1 [Cheng Pan] nit
ac8b5fb5d [Cheng Pan] Simplify Option convertion
Authored-by: Cheng Pan <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
.../kyuubi/operation/BatchJobSubmission.scala | 4 +-
.../kyuubi/operation/KyuubiOperationManager.scala | 4 +-
.../kyuubi/server/KyuubiRestFrontendService.scala | 2 +-
.../kyuubi/server/api/v1/BatchesResource.scala | 78 ++++++++++------------
.../kyuubi/server/metadata/MetadataManager.scala | 10 +--
...hSessionImpl.scala => KyuubiBatchSession.scala} | 2 +-
.../kyuubi/session/KyuubiSessionManager.scala | 32 ++++-----
.../org/apache/kyuubi/WithKyuubiServerOnYarn.scala | 10 +--
.../server/api/v1/BatchesResourceSuite.scala | 16 ++---
.../server/metadata/MetadataManagerSuite.scala | 4 +-
10 files changed, 78 insertions(+), 84 deletions(-)
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
index e58539ff7..ab2cfa302 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
@@ -36,7 +36,7 @@ import
org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
import org.apache.kyuubi.operation.OperationState.{isTerminal, CANCELED,
OperationState, RUNNING}
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.server.metadata.api.Metadata
-import org.apache.kyuubi.session.KyuubiBatchSessionImpl
+import org.apache.kyuubi.session.KyuubiBatchSession
/**
* The state of batch operation is special. In general, the lifecycle of state
is:
@@ -51,7 +51,7 @@ import org.apache.kyuubi.session.KyuubiBatchSessionImpl
* user close the batch session that means the final status is CANCELED.
*/
class BatchJobSubmission(
- session: KyuubiBatchSessionImpl,
+ session: KyuubiBatchSession,
val batchType: String,
val batchName: String,
resource: String,
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
index 4730d1618..3078401b0 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
@@ -28,7 +28,7 @@ import
org.apache.kyuubi.metrics.MetricsConstants.OPERATION_OPEN
import org.apache.kyuubi.metrics.MetricsSystem
import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
import org.apache.kyuubi.server.metadata.api.Metadata
-import org.apache.kyuubi.session.{KyuubiBatchSessionImpl, KyuubiSessionImpl,
Session}
+import org.apache.kyuubi.session.{KyuubiBatchSession, KyuubiSessionImpl,
Session}
import org.apache.kyuubi.sql.plan.command.RunnableCommand
import org.apache.kyuubi.util.ThriftUtils
@@ -74,7 +74,7 @@ class KyuubiOperationManager private (name: String) extends
OperationManager(nam
}
def newBatchJobSubmissionOperation(
- session: KyuubiBatchSessionImpl,
+ session: KyuubiBatchSession,
batchType: String,
batchName: String,
resource: String,
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala
index 86cb28aed..5b6eb0408 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala
@@ -123,7 +123,7 @@ class KyuubiRestFrontendService(override val serverable:
Serverable)
sessionManager.getPeerInstanceClosedBatchSessions(connectionUrl).foreach {
batch =>
Utils.tryLogNonFatalError {
val sessionHandle = SessionHandle.fromUUID(batch.identifier)
-
Option(sessionManager.getBatchSessionImpl(sessionHandle)).foreach(_.close())
+ sessionManager.getBatchSession(sessionHandle).foreach(_.close())
}
}
} catch {
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
index 11c36c757..8271c8b88 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
@@ -38,7 +38,7 @@ import org.apache.kyuubi.{Logging, Utils}
import org.apache.kyuubi.client.api.v1.dto._
import org.apache.kyuubi.client.exception.KyuubiRestException
import org.apache.kyuubi.client.util.BatchUtils._
-import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.config.KyuubiReservedKeys._
import org.apache.kyuubi.engine.{ApplicationInfo, KyuubiApplicationManager}
import org.apache.kyuubi.operation.{BatchJobSubmission, FetchOrientation,
OperationState}
@@ -46,7 +46,7 @@ import org.apache.kyuubi.server.api.ApiRequestContext
import org.apache.kyuubi.server.api.v1.BatchesResource._
import org.apache.kyuubi.server.metadata.MetadataManager
import org.apache.kyuubi.server.metadata.api.Metadata
-import org.apache.kyuubi.session.{KyuubiBatchSessionImpl,
KyuubiSessionManager, SessionHandle}
+import org.apache.kyuubi.session.{KyuubiBatchSession, KyuubiSessionManager,
SessionHandle}
import org.apache.kyuubi.util.JdbcUtils
@Tag(name = "Batch")
@@ -54,45 +54,33 @@ import org.apache.kyuubi.util.JdbcUtils
private[v1] class BatchesResource extends ApiRequestContext with Logging {
private val internalRestClients = new ConcurrentHashMap[String,
InternalRestClient]()
private lazy val internalSocketTimeout =
- fe.getConf.get(KyuubiConf.BATCH_INTERNAL_REST_CLIENT_SOCKET_TIMEOUT)
+ fe.getConf.get(BATCH_INTERNAL_REST_CLIENT_SOCKET_TIMEOUT).toInt
private lazy val internalConnectTimeout =
- fe.getConf.get(KyuubiConf.BATCH_INTERNAL_REST_CLIENT_CONNECT_TIMEOUT)
+ fe.getConf.get(BATCH_INTERNAL_REST_CLIENT_CONNECT_TIMEOUT).toInt
private def getInternalRestClient(kyuubiInstance: String):
InternalRestClient = {
internalRestClients.computeIfAbsent(
kyuubiInstance,
- kyuubiInstance => {
- new InternalRestClient(
- kyuubiInstance,
- internalSocketTimeout.toInt,
- internalConnectTimeout.toInt)
- })
+ k => new InternalRestClient(k, internalSocketTimeout,
internalConnectTimeout))
}
private def sessionManager =
fe.be.sessionManager.asInstanceOf[KyuubiSessionManager]
- private def buildBatch(session: KyuubiBatchSessionImpl): Batch = {
+ private def buildBatch(session: KyuubiBatchSession): Batch = {
val batchOp = session.batchJobSubmissionOp
val batchOpStatus = batchOp.getStatus
- val batchAppStatus = batchOp.getApplicationInfo
- val name =
Option(batchOp.batchName).getOrElse(batchAppStatus.map(_.name).orNull)
- var appId: String = null
- var appUrl: String = null
- var appState: String = null
- var appDiagnostic: String = null
-
- if (!OperationState.isTerminal(batchOpStatus.state) &&
batchAppStatus.nonEmpty) {
- appId = batchAppStatus.get.id
- appUrl = batchAppStatus.get.url.orNull
- appState = batchAppStatus.get.state.toString
- appDiagnostic = batchAppStatus.get.error.orNull
- } else {
- val metadata = sessionManager.getBatchMetadata(batchOp.batchId)
- appId = metadata.engineId
- appUrl = metadata.engineUrl
- appState = metadata.engineState
- appDiagnostic = metadata.engineError.orNull
+ val (name, appId, appUrl, appState, appDiagnostic) =
batchOp.getApplicationInfo.map { appInfo =>
+ val name = Option(batchOp.batchName).getOrElse(appInfo.name)
+ (name, appInfo.id, appInfo.url.orNull, appInfo.state.toString,
appInfo.error.orNull)
+ }.getOrElse {
+ sessionManager.getBatchMetadata(batchOp.batchId) match {
+ case Some(batch) =>
+ val diagnostic = batch.engineError.orNull
+ (batchOp.batchName, batch.engineId, batch.engineUrl,
batch.engineState, diagnostic)
+ case None =>
+ (batchOp.batchName, null, null, null, null)
+ }
}
new Batch(
@@ -185,8 +173,8 @@ private[v1] class BatchesResource extends ApiRequestContext
with Logging {
@FormDataParam("resourceFile") resourceFileInputStream: InputStream,
@FormDataParam("resourceFile") resourceFileMetadata:
FormDataContentDisposition): Batch = {
require(
- fe.getConf.get(KyuubiConf.BATCH_RESOURCE_UPLOAD_ENABLED),
- "Batch resource upload function is not enabled.")
+ fe.getConf.get(BATCH_RESOURCE_UPLOAD_ENABLED),
+ "Batch resource upload function is disabled.")
require(
batchRequest != null,
"batchRequest is required and please check the content type" +
@@ -228,7 +216,7 @@ private[v1] class BatchesResource extends ApiRequestContext
with Logging {
}
userProvidedBatchId.flatMap { batchId =>
- Option(sessionManager.getBatchFromMetadataStore(batchId))
+ sessionManager.getBatchFromMetadataStore(batchId)
} match {
case Some(batch) =>
markDuplicated(batch)
@@ -254,11 +242,17 @@ private[v1] class BatchesResource extends
ApiRequestContext with Logging {
request)
} match {
case Success(sessionHandle) =>
- buildBatch(sessionManager.getBatchSessionImpl(sessionHandle))
+ sessionManager.getBatchSession(sessionHandle) match {
+ case Some(batchSession) => buildBatch(batchSession)
+ case None => throw new IllegalStateException(
+ s"can not find batch $batchId from metadata store")
+ }
case Failure(cause) if JdbcUtils.isDuplicatedKeyDBErr(cause) =>
- val batch = sessionManager.getBatchFromMetadataStore(batchId)
- assert(batch != null, s"can not find duplicated batch $batchId
from metadata store")
- markDuplicated(batch)
+ sessionManager.getBatchFromMetadataStore(batchId) match {
+ case Some(batch) => markDuplicated(batch)
+ case None => throw new IllegalStateException(
+ s"can not find duplicated batch $batchId from metadata
store")
+ }
}
}
}
@@ -280,10 +274,10 @@ private[v1] class BatchesResource extends
ApiRequestContext with Logging {
def batchInfo(@PathParam("batchId") batchId: String): Batch = {
val userName = fe.getSessionUser(Map.empty[String, String])
val sessionHandle = formatSessionHandle(batchId)
- Option(sessionManager.getBatchSessionImpl(sessionHandle)).map {
batchSession =>
+ sessionManager.getBatchSession(sessionHandle).map { batchSession =>
buildBatch(batchSession)
}.getOrElse {
- Option(sessionManager.getBatchMetadata(batchId)).map { metadata =>
+ sessionManager.getBatchMetadata(batchId).map { metadata =>
if (OperationState.isTerminal(OperationState.withName(metadata.state))
||
metadata.kyuubiInstance == fe.connectionUrl) {
MetadataManager.buildBatch(metadata)
@@ -359,7 +353,7 @@ private[v1] class BatchesResource extends ApiRequestContext
with Logging {
@QueryParam("size") @DefaultValue("100") size: Int): OperationLog = {
val userName = fe.getSessionUser(Map.empty[String, String])
val sessionHandle = formatSessionHandle(batchId)
- Option(sessionManager.getBatchSessionImpl(sessionHandle)).map {
batchSession =>
+ sessionManager.getBatchSession(sessionHandle).map { batchSession =>
try {
val submissionOp = batchSession.batchJobSubmissionOp
val rowSet =
submissionOp.getOperationLogRowSet(FetchOrientation.FETCH_NEXT, from, size)
@@ -379,7 +373,7 @@ private[v1] class BatchesResource extends ApiRequestContext
with Logging {
throw new NotFoundException(errorMsg)
}
}.getOrElse {
- Option(sessionManager.getBatchMetadata(batchId)).map { metadata =>
+ sessionManager.getBatchMetadata(batchId).map { metadata =>
if (fe.connectionUrl != metadata.kyuubiInstance) {
val internalRestClient =
getInternalRestClient(metadata.kyuubiInstance)
internalRestClient.getBatchLocalLog(userName, batchId, from, size)
@@ -408,7 +402,7 @@ private[v1] class BatchesResource extends ApiRequestContext
with Logging {
val userName = fe.getSessionUser(hs2ProxyUser)
- Option(sessionManager.getBatchSessionImpl(sessionHandle)).map {
batchSession =>
+ sessionManager.getBatchSession(sessionHandle).map { batchSession =>
if (userName != batchSession.user) {
throw new WebApplicationException(
s"$userName is not allowed to close the session belong to
${batchSession.user}",
@@ -418,7 +412,7 @@ private[v1] class BatchesResource extends ApiRequestContext
with Logging {
val (success, msg) = batchSession.batchJobSubmissionOp.getKillMessage
new CloseBatchResponse(success, msg)
}.getOrElse {
- Option(sessionManager.getBatchMetadata(batchId)).map { metadata =>
+ sessionManager.getBatchMetadata(batchId).map { metadata =>
if (userName != metadata.username) {
throw new WebApplicationException(
s"$userName is not allowed to close the session belong to
${metadata.username}",
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataManager.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataManager.scala
index 88a7f4e4e..17beeb62a 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataManager.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataManager.scala
@@ -124,13 +124,13 @@ class MetadataManager extends
AbstractService("MetadataManager") {
}
}
- def getBatch(batchId: String): Batch = {
- Option(getBatchSessionMetadata(batchId)).map(buildBatch).orNull
+ def getBatch(batchId: String): Option[Batch] = {
+ getBatchSessionMetadata(batchId).map(buildBatch)
}
- def getBatchSessionMetadata(batchId: String): Metadata = {
- Option(withMetadataRequestMetrics(_metadataStore.getMetadata(batchId,
true))).filter(
- _.sessionType == SessionType.BATCH).orNull
+ def getBatchSessionMetadata(batchId: String): Option[Metadata] = {
+ Option(withMetadataRequestMetrics(_metadataStore.getMetadata(batchId,
true)))
+ .filter(_.sessionType == SessionType.BATCH)
}
def getBatches(
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala
similarity index 99%
rename from
kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala
rename to
kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala
index ba2046829..ded4c8bf4 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala
@@ -30,7 +30,7 @@ import org.apache.kyuubi.operation.OperationState
import org.apache.kyuubi.server.metadata.api.Metadata
import org.apache.kyuubi.session.SessionType.SessionType
-class KyuubiBatchSessionImpl(
+class KyuubiBatchSession(
user: String,
password: String,
ipAddress: String,
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
index d5504ed19..6e74bedbf 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
@@ -121,7 +121,7 @@ class KyuubiSessionManager private (name: String) extends
SessionManager(name) {
super.closeSession(sessionHandle)
} finally {
session match {
- case _: KyuubiBatchSessionImpl =>
+ case _: KyuubiBatchSession =>
batchLimiter.foreach(_.decrement(UserIpAddress(session.user,
session.ipAddress)))
case _ =>
limiter.foreach(_.decrement(UserIpAddress(session.user,
session.ipAddress)))
@@ -142,11 +142,11 @@ class KyuubiSessionManager private (name: String) extends
SessionManager(name) {
batchConf: Map[String, String],
batchArgs: Seq[String],
recoveryMetadata: Option[Metadata] = None,
- shouldRunAsync: Boolean): KyuubiBatchSessionImpl = {
+ shouldRunAsync: Boolean): KyuubiBatchSession = {
// scalastyle:on
val username = Option(user).filter(_.nonEmpty).getOrElse("anonymous")
val sessionConf = this.getConf.getUserDefaults(user)
- new KyuubiBatchSessionImpl(
+ new KyuubiBatchSession(
username,
password,
ipAddress,
@@ -163,7 +163,7 @@ class KyuubiSessionManager private (name: String) extends
SessionManager(name) {
shouldRunAsync)
}
- private[kyuubi] def openBatchSession(batchSession: KyuubiBatchSessionImpl):
SessionHandle = {
+ private[kyuubi] def openBatchSession(batchSession: KyuubiBatchSession):
SessionHandle = {
val user = batchSession.user
val ipAddress = batchSession.ipAddress
batchLimiter.foreach(_.increment(UserIpAddress(user, ipAddress)))
@@ -216,8 +216,8 @@ class KyuubiSessionManager private (name: String) extends
SessionManager(name) {
openBatchSession(batchSession)
}
- def getBatchSessionImpl(sessionHandle: SessionHandle):
KyuubiBatchSessionImpl = {
-
getSessionOption(sessionHandle).map(_.asInstanceOf[KyuubiBatchSessionImpl]).orNull
+ def getBatchSession(sessionHandle: SessionHandle):
Option[KyuubiBatchSession] = {
+ getSessionOption(sessionHandle).map(_.asInstanceOf[KyuubiBatchSession])
}
def insertMetadata(metadata: Metadata): Unit = {
@@ -229,15 +229,15 @@ class KyuubiSessionManager private (name: String) extends
SessionManager(name) {
}
def getMetadataRequestsRetryRef(identifier: String):
Option[MetadataRequestsRetryRef] = {
-
Option(metadataManager.map(_.getMetadataRequestsRetryRef(identifier)).orNull)
+ metadataManager.flatMap(mm =>
Option(mm.getMetadataRequestsRetryRef(identifier)))
}
def deRegisterMetadataRequestsRetryRef(identifier: String): Unit = {
metadataManager.foreach(_.deRegisterRequestsRetryRef(identifier))
}
- def getBatchFromMetadataStore(batchId: String): Batch = {
- metadataManager.map(_.getBatch(batchId)).orNull
+ def getBatchFromMetadataStore(batchId: String): Option[Batch] = {
+ metadataManager.flatMap(mm => mm.getBatch(batchId))
}
def getBatchesFromMetadataStore(
@@ -248,13 +248,13 @@ class KyuubiSessionManager private (name: String) extends
SessionManager(name) {
endTime: Long,
from: Int,
size: Int): Seq[Batch] = {
- metadataManager.map(
- _.getBatches(batchType, batchUser, batchState, createTime, endTime,
from, size))
- .getOrElse(Seq.empty)
+ metadataManager.map { mm =>
+ mm.getBatches(batchType, batchUser, batchState, createTime, endTime,
from, size)
+ }.getOrElse(Seq.empty)
}
- def getBatchMetadata(batchId: String): Metadata = {
- metadataManager.map(_.getBatchSessionMetadata(batchId)).orNull
+ def getBatchMetadata(batchId: String): Option[Metadata] = {
+ metadataManager.flatMap(_.getBatchSessionMetadata(batchId))
}
@VisibleForTesting
@@ -273,7 +273,7 @@ class KyuubiSessionManager private (name: String) extends
SessionManager(name) {
startEngineAliveChecker()
}
- def getBatchSessionsToRecover(kyuubiInstance: String):
Seq[KyuubiBatchSessionImpl] = {
+ def getBatchSessionsToRecover(kyuubiInstance: String):
Seq[KyuubiBatchSession] = {
Seq(OperationState.PENDING, OperationState.RUNNING).flatMap {
stateToRecover =>
metadataManager.map(_.getBatchesRecoveryMetadata(
stateToRecover.toString,
@@ -349,7 +349,7 @@ class KyuubiSessionManager private (name: String) extends
SessionManager(name) {
private def startEngineAliveChecker(): Unit = {
val interval = conf.get(KyuubiConf.ENGINE_ALIVE_PROBE_INTERVAL)
val checkTask: Runnable = () => {
- allSessions.foreach { session =>
+ allSessions().foreach { session =>
if (!session.asInstanceOf[KyuubiSessionImpl].checkEngineAlive()) {
try {
closeSession(session.handle)
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala
index f2ec60fea..2ed413dcb 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala
@@ -31,7 +31,7 @@ import org.apache.kyuubi.engine.ApplicationState._
import org.apache.kyuubi.operation.{FetchOrientation, HiveJDBCTestHelper,
OperationState}
import org.apache.kyuubi.operation.OperationState.ERROR
import org.apache.kyuubi.server.MiniYarnService
-import org.apache.kyuubi.session.{KyuubiBatchSessionImpl, KyuubiSessionManager}
+import org.apache.kyuubi.session.{KyuubiBatchSession, KyuubiSessionManager}
/**
* To developers:
@@ -119,7 +119,7 @@ class KyuubiOperationYarnClusterSuite extends
WithKyuubiServerOnYarn with HiveJD
batchRequest.getConf.asScala.toMap,
batchRequest)
- val session =
sessionManager.getSession(sessionHandle).asInstanceOf[KyuubiBatchSessionImpl]
+ val session =
sessionManager.getSession(sessionHandle).asInstanceOf[KyuubiBatchSession]
val batchJobSubmissionOp = session.batchJobSubmissionOp
eventually(timeout(3.minutes), interval(50.milliseconds)) {
@@ -130,8 +130,8 @@ class KyuubiOperationYarnClusterSuite extends
WithKyuubiServerOnYarn with HiveJD
eventually(timeout(10.seconds)) {
val metadata =
session.sessionManager.getBatchMetadata(session.handle.identifier.toString)
- assert(metadata.state === "RUNNING")
- assert(metadata.engineId.startsWith("application_"))
+ assert(metadata.map(_.state).contains("RUNNING"))
+ assert(metadata.map(_.engineId).get.startsWith("application_"))
}
val killResponse =
yarnOperation.killApplicationByTag(sessionHandle.identifier.toString)
@@ -179,7 +179,7 @@ class KyuubiOperationYarnClusterSuite extends
WithKyuubiServerOnYarn with HiveJD
batchRequest.getConf.asScala.toMap,
batchRequest)
- val session =
sessionManager.getSession(sessionHandle).asInstanceOf[KyuubiBatchSessionImpl]
+ val session =
sessionManager.getSession(sessionHandle).asInstanceOf[KyuubiBatchSession]
val batchJobSubmissionOp = session.batchJobSubmissionOp
eventually(timeout(3.minutes), interval(50.milliseconds)) {
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
index dc59d75e3..62a7fec13 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
@@ -46,7 +46,7 @@ import org.apache.kyuubi.server.KyuubiRestFrontendService
import
org.apache.kyuubi.server.http.authentication.AuthenticationHandler.AUTHORIZATION_HEADER
import org.apache.kyuubi.server.metadata.api.Metadata
import org.apache.kyuubi.service.authentication.KyuubiAuthenticationFactory
-import org.apache.kyuubi.session.{KyuubiBatchSessionImpl,
KyuubiSessionManager, SessionHandle, SessionType}
+import org.apache.kyuubi.session.{KyuubiBatchSession, KyuubiSessionManager,
SessionHandle, SessionType}
class BatchesResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper
with BatchTestHelper {
override protected lazy val conf: KyuubiConf = KyuubiConf()
@@ -464,8 +464,8 @@ class BatchesResourceSuite extends KyuubiFunSuite with
RestFrontendTestHelper wi
sessionManager.insertMetadata(batchMetadata)
sessionManager.insertMetadata(batchMetadata2)
-
assert(sessionManager.getBatchFromMetadataStore(batchId1).getState.equals("PENDING"))
-
assert(sessionManager.getBatchFromMetadataStore(batchId2).getState.equals("PENDING"))
+
assert(sessionManager.getBatchFromMetadataStore(batchId1).map(_.getState).contains("PENDING"))
+
assert(sessionManager.getBatchFromMetadataStore(batchId2).map(_.getState).contains("PENDING"))
val sparkBatchProcessBuilder = new SparkBatchProcessBuilder(
"kyuubi",
@@ -501,8 +501,8 @@ class BatchesResourceSuite extends KyuubiFunSuite with
RestFrontendTestHelper wi
val sessionHandle1 = SessionHandle.fromUUID(batchId1)
val sessionHandle2 = SessionHandle.fromUUID(batchId2)
- val session1 =
sessionManager.getSession(sessionHandle1).asInstanceOf[KyuubiBatchSessionImpl]
- val session2 =
sessionManager.getSession(sessionHandle2).asInstanceOf[KyuubiBatchSessionImpl]
+ val session1 =
sessionManager.getSession(sessionHandle1).asInstanceOf[KyuubiBatchSession]
+ val session2 =
sessionManager.getSession(sessionHandle2).asInstanceOf[KyuubiBatchSession]
assert(session1.createTime === batchMetadata.createTime)
assert(session2.createTime === batchMetadata2.createTime)
@@ -638,8 +638,8 @@ class BatchesResourceSuite extends KyuubiFunSuite with
RestFrontendTestHelper wi
.post(Entity.entity(requestObj, MediaType.APPLICATION_JSON_TYPE))
assert(200 == response.getStatus)
val batch = response.readEntity(classOf[Batch])
- val batchSession =
sessionManager.getBatchSessionImpl(SessionHandle.fromUUID(batch.getId))
- assert(batchSession.ipAddress === realClientIp)
+ val batchSession =
sessionManager.getBatchSession(SessionHandle.fromUUID(batch.getId))
+ assert(batchSession.map(_.ipAddress).contains(realClientIp))
}
test("expose the metrics with operation type and current state") {
@@ -708,6 +708,6 @@ class BatchesResourceSuite extends KyuubiFunSuite with
RestFrontendTestHelper wi
val sessionHandleRegex = "\\[[\\S]*\\]".r
val batchId =
sessionHandleRegex.findFirstMatchIn(e.getMessage).get.group(0)
.replaceAll("\\[", "").replaceAll("\\]", "")
- assert(sessionManager.getBatchMetadata(batchId).state == "CANCELED")
+
assert(sessionManager.getBatchMetadata(batchId).map(_.state).contains("CANCELED"))
}
}
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/MetadataManagerSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/MetadataManagerSuite.scala
index 75c935a3d..8064b7f1f 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/MetadataManagerSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/MetadataManagerSuite.scala
@@ -66,7 +66,7 @@ class MetadataManagerSuite extends KyuubiFunSuite {
retryRef.addRetryingMetadataRequest(UpdateMetadata(metadataToUpdate))
eventually(timeout(3.seconds)) {
assert(retryRef.hasRemainingRequests())
- assert(metadataManager.getBatch(metadata.identifier).getState ===
"PENDING")
+
assert(metadataManager.getBatch(metadata.identifier).map(_.getState).contains("PENDING"))
}
val metadata2 = metadata.copy(identifier = UUID.randomUUID().toString)
@@ -84,7 +84,7 @@ class MetadataManagerSuite extends KyuubiFunSuite {
eventually(timeout(3.seconds)) {
assert(!retryRef2.hasRemainingRequests())
- assert(metadataManager.getBatch(metadata2.identifier).getState ===
"RUNNING")
+
assert(metadataManager.getBatch(metadata2.identifier).map(_.getState).contains("RUNNING"))
}
metadataManager.identifierRequestsAsyncRetryRefs.clear()