This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch branch-1.8
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/branch-1.8 by this push:
new b222bd2db [KYUUBI #5243] Distinguish metadata between batch impl v2
and recovery
b222bd2db is described below
commit b222bd2db31ffa6219c560ef276fef62146c417b
Author: Cheng Pan <[email protected]>
AuthorDate: Wed Sep 6 02:51:43 2023 +0800
[KYUUBI #5243] Distinguish metadata between batch impl v2 and recovery
### _Why are the changes needed?_
The `recoveryMetadata` is not accurate after batch impl is introduced. This
PR proposes to rename `recoveryMetadata` to `metadata` and introduce a
dedicated flay `fromRecovery` to distinguish metadata between them.
This PR also partially reverts #4798, by removing unnecessary constructor
parameters `shouldRunAsync` and `batchConf`
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run
test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests)
locally before make a pull request
### _Was this patch authored or co-authored using generative AI tooling?_
No.
Closes #5243 from pan3793/meta-recov.
Closes #5243
0718fbefe [Cheng Pan] nit
b8358464c [Cheng Pan] simplify
a2d6519c6 [Cheng Pan] fix test
2dad868bd [Cheng Pan] refactor
f83d2a602 [Cheng Pan] Distinguish batch impl v2 metadata from recovery
Authored-by: Cheng Pan <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
(cherry picked from commit 6a23f88b00e03b36f8f68c526128b166f866dc6f)
Signed-off-by: Cheng Pan <[email protected]>
---
.../test/spark/SparkOnKubernetesTestsSuite.scala | 3 -
.../kyuubi/operation/BatchJobSubmission.scala | 45 +++++++------
.../kyuubi/operation/KyuubiOperationManager.scala | 6 +-
.../apache/kyuubi/server/KyuubiBatchService.scala | 14 +---
.../kyuubi/server/api/v1/BatchesResource.scala | 1 -
.../apache/kyuubi/session/KyuubiBatchSession.scala | 77 +++++++++++++---------
.../kyuubi/session/KyuubiSessionManager.scala | 22 +++----
.../org/apache/kyuubi/WithKyuubiServerOnYarn.scala | 2 -
.../ServerJsonLoggingEventHandlerSuite.scala | 3 +-
.../server/api/v1/BatchesResourceSuite.scala | 22 ++++---
.../kyuubi/server/rest/client/BatchCliSuite.scala | 12 ++--
11 files changed, 102 insertions(+), 105 deletions(-)
diff --git
a/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala
b/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala
index 037681a3f..3f591e604 100644
---
a/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala
+++
b/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala
@@ -19,7 +19,6 @@ package org.apache.kyuubi.kubernetes.test.spark
import java.util.UUID
-import scala.collection.JavaConverters._
import scala.concurrent.duration._
import org.apache.hadoop.conf.Configuration
@@ -149,7 +148,6 @@ class KyuubiOperationKubernetesClusterClientModeSuite
"kyuubi",
"passwd",
"localhost",
- batchRequest.getConf.asScala.toMap,
batchRequest)
eventually(timeout(3.minutes), interval(50.milliseconds)) {
@@ -217,7 +215,6 @@ class KyuubiOperationKubernetesClusterClusterModeSuite
"runner",
"passwd",
"localhost",
- batchRequest.getConf.asScala.toMap,
batchRequest)
// wait for driver pod start
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
index ac723b2c6..4ea609540 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
@@ -58,11 +58,12 @@ class BatchJobSubmission(
className: String,
batchConf: Map[String, String],
batchArgs: Seq[String],
- recoveryMetadata: Option[Metadata],
- override val shouldRunAsync: Boolean)
+ metadata: Option[Metadata])
extends KyuubiApplicationOperation(session) {
import BatchJobSubmission._
+ override def shouldRunAsync: Boolean = true
+
private val _operationLog = OperationLog.createOperationLog(session,
getHandle)
private val applicationManager = session.sessionManager.applicationManager
@@ -75,7 +76,7 @@ class BatchJobSubmission(
private var killMessage: KillResponse = (false, "UNKNOWN")
def getKillMessage: KillResponse = killMessage
- @volatile private var _appStartTime =
recoveryMetadata.map(_.engineOpenTime).getOrElse(0L)
+ @volatile private var _appStartTime =
metadata.map(_.engineOpenTime).getOrElse(0L)
def appStartTime: Long = _appStartTime
def appStarted: Boolean = _appStartTime > 0
@@ -184,21 +185,24 @@ class BatchJobSubmission(
override protected def runInternal(): Unit = session.handleSessionException {
val asyncOperation: Runnable = () => {
try {
- recoveryMetadata match {
+ metadata match {
case Some(metadata) if metadata.peerInstanceClosed =>
setState(OperationState.CANCELED)
case Some(metadata) if metadata.state ==
OperationState.PENDING.toString =>
- // In recovery mode, only submit batch job when previous state is
PENDING
- // and fail to fetch the status including appId from resource
manager.
- // Otherwise, monitor the submitted batch application.
+ // case 1: new batch job created using batch impl v2
+ // case 2: batch job from recovery, do submission only when
previous state is
+ // PENDING and fail to fetch the status by appId from resource
manager, which
+ // is similar with case 1; otherwise, monitor the submitted batch
application.
_applicationInfo = currentApplicationInfo()
applicationId(_applicationInfo) match {
- case Some(appId) => monitorBatchJob(appId)
case None => submitAndMonitorBatchJob()
+ case Some(appId) => monitorBatchJob(appId)
}
case Some(metadata) =>
+ // batch job from recovery which was submitted
monitorBatchJob(metadata.engineId)
case None =>
+ // brand-new job created using batch impl v1
submitAndMonitorBatchJob()
}
setStateIfNotCanceled(OperationState.FINISHED)
@@ -219,7 +223,6 @@ class BatchJobSubmission(
updateBatchMetadata()
}
}
- if (!shouldRunAsync) getBackgroundHandle.get()
}
private def submitAndMonitorBatchJob(): Unit = {
@@ -295,19 +298,19 @@ class BatchJobSubmission(
}
if (_applicationInfo.isEmpty) {
info(s"The $batchType batch[$batchId] job: $appId not found, assume that
it has finished.")
- } else if (applicationFailed(_applicationInfo)) {
+ return
+ }
+ if (applicationFailed(_applicationInfo)) {
+ throw new KyuubiException(s"$batchType batch[$batchId] job failed:
${_applicationInfo}")
+ }
+ updateBatchMetadata()
+ // TODO: add limit for max batch job submission lifetime
+ while (_applicationInfo.isDefined &&
!applicationTerminated(_applicationInfo)) {
+ Thread.sleep(applicationCheckInterval)
+ updateApplicationInfoMetadataIfNeeded()
+ }
+ if (applicationFailed(_applicationInfo)) {
throw new KyuubiException(s"$batchType batch[$batchId] job failed:
${_applicationInfo}")
- } else {
- updateBatchMetadata()
- // TODO: add limit for max batch job submission lifetime
- while (_applicationInfo.isDefined &&
!applicationTerminated(_applicationInfo)) {
- Thread.sleep(applicationCheckInterval)
- updateApplicationInfoMetadataIfNeeded()
- }
-
- if (applicationFailed(_applicationInfo)) {
- throw new KyuubiException(s"$batchType batch[$batchId] job failed:
${_applicationInfo}")
- }
}
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
index 8ae9c91f8..739c99cd7 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
@@ -81,8 +81,7 @@ class KyuubiOperationManager private (name: String) extends
OperationManager(nam
className: String,
batchConf: Map[String, String],
batchArgs: Seq[String],
- recoveryMetadata: Option[Metadata],
- shouldRunAsync: Boolean): BatchJobSubmission = {
+ metadata: Option[Metadata]): BatchJobSubmission = {
val operation = new BatchJobSubmission(
session,
batchType,
@@ -91,8 +90,7 @@ class KyuubiOperationManager private (name: String) extends
OperationManager(nam
className,
batchConf,
batchArgs,
- recoveryMetadata,
- shouldRunAsync)
+ metadata)
addOperation(operation)
operation
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala
index 7ed2ab8e1..bf10a68fa 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala
@@ -22,7 +22,6 @@ import java.util.concurrent.atomic.AtomicBoolean
import org.apache.kyuubi.config.KyuubiConf.BATCH_SUBMITTER_THREADS
import org.apache.kyuubi.operation.OperationState
import org.apache.kyuubi.server.metadata.MetadataManager
-import org.apache.kyuubi.server.metadata.api.Metadata
import org.apache.kyuubi.service.{AbstractService, Serverable}
import org.apache.kyuubi.session.KyuubiSessionManager
import org.apache.kyuubi.util.ThreadUtils
@@ -81,16 +80,9 @@ class KyuubiBatchService(
Option(metadata.requestName),
metadata.resource,
metadata.className,
- metadata.requestConf,
metadata.requestArgs,
- Some(metadata), // TODO some logic need to fix since it's not
from recovery
- shouldRunAsync = true)
- val metadataForUpdate = Metadata(
- identifier = batchId,
- kyuubiInstance = kyuubiInstance,
- requestConf = batchSession.optimizedConf,
- clusterManager =
batchSession.batchJobSubmissionOp.builder.clusterManager())
- metadataManager.updateMetadata(metadataForUpdate,
asyncRetryOnError = false)
+ Some(metadata),
+ fromRecovery = false)
val sessionHandle = sessionManager.openBatchSession(batchSession)
var submitted = false
while (!submitted) { // block until batch job submitted
@@ -113,7 +105,7 @@ class KyuubiBatchService(
// }
if (!submitted) Thread.sleep(1000)
}
- info(s"$batchId is submitted.")
+ info(s"$batchId is submitted or finished.")
}
}
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
index d7e8b615a..12db68aeb 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
@@ -269,7 +269,6 @@ private[v1] class BatchesResource extends ApiRequestContext
with Logging {
userName,
"anonymous",
ipAddress,
- request.getConf.asScala.toMap,
request)
} match {
case Success(sessionHandle) =>
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala
index 014bbced3..c8563bca1 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala
@@ -41,10 +41,9 @@ class KyuubiBatchSession(
batchName: Option[String],
resource: String,
className: String,
- batchConf: Map[String, String],
batchArgs: Seq[String],
- recoveryMetadata: Option[Metadata] = None,
- shouldRunAsync: Boolean)
+ metadata: Option[Metadata] = None,
+ fromRecovery: Boolean)
extends KyuubiSession(
TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1,
user,
@@ -55,11 +54,11 @@ class KyuubiBatchSession(
override val sessionType: SessionType = SessionType.BATCH
override val handle: SessionHandle = {
- val batchId =
recoveryMetadata.map(_.identifier).getOrElse(conf(KYUUBI_BATCH_ID_KEY))
+ val batchId =
metadata.map(_.identifier).getOrElse(conf(KYUUBI_BATCH_ID_KEY))
SessionHandle.fromUUID(batchId)
}
- override def createTime: Long =
recoveryMetadata.map(_.createTime).getOrElse(super.createTime)
+ override def createTime: Long =
metadata.map(_.createTime).getOrElse(super.createTime)
override def getNoOperationTime: Long = {
if (batchJobSubmissionOp != null && !OperationState.isTerminal(
@@ -74,7 +73,7 @@ class KyuubiBatchSession(
sessionManager.getConf.get(KyuubiConf.BATCH_SESSION_IDLE_TIMEOUT)
override val normalizedConf: Map[String, String] =
- sessionConf.getBatchConf(batchType) ++
sessionManager.validateBatchConf(batchConf)
+ sessionConf.getBatchConf(batchType) ++
sessionManager.validateBatchConf(conf)
val optimizedConf: Map[String, String] = {
val confOverlay = sessionManager.sessionConfAdvisor.getConfOverlay(
@@ -95,7 +94,7 @@ class KyuubiBatchSession(
// whether the resource file is from uploading
private[kyuubi] val isResourceUploaded: Boolean =
- batchConf.getOrElse(KyuubiReservedKeys.KYUUBI_BATCH_RESOURCE_UPLOADED_KEY,
"false").toBoolean
+ conf.getOrElse(KyuubiReservedKeys.KYUUBI_BATCH_RESOURCE_UPLOADED_KEY,
"false").toBoolean
private[kyuubi] lazy val batchJobSubmissionOp =
sessionManager.operationManager
.newBatchJobSubmissionOperation(
@@ -106,8 +105,7 @@ class KyuubiBatchSession(
className,
optimizedConf,
batchArgs,
- recoveryMetadata,
- shouldRunAsync)
+ metadata)
private def waitMetadataRequestsRetryCompletion(): Unit = {
val batchId = batchJobSubmissionOp.batchId
@@ -122,7 +120,9 @@ class KyuubiBatchSession(
}
private val sessionEvent = KyuubiSessionEvent(this)
- recoveryMetadata.foreach(metadata => sessionEvent.engineId =
metadata.engineId)
+ if (fromRecovery) {
+ metadata.foreach { m => sessionEvent.engineId = m.engineId }
+ }
EventBus.post(sessionEvent)
override def getSessionEvent: Option[KyuubiSessionEvent] = {
@@ -142,32 +142,47 @@ class KyuubiBatchSession(
override def open(): Unit = handleSessionException {
traceMetricsOnOpen()
- if (recoveryMetadata.isEmpty) {
+ lazy val kubernetesInfo: Map[String, String] = {
val appMgrInfo = batchJobSubmissionOp.builder.appMgrInfo()
- val kubernetesInfo = appMgrInfo.kubernetesInfo.context.map { context =>
+ appMgrInfo.kubernetesInfo.context.map { context =>
Map(KyuubiConf.KUBERNETES_CONTEXT.key -> context)
}.getOrElse(Map.empty) ++ appMgrInfo.kubernetesInfo.namespace.map {
namespace =>
Map(KyuubiConf.KUBERNETES_NAMESPACE.key -> namespace)
}.getOrElse(Map.empty)
- val metaData = Metadata(
- identifier = handle.identifier.toString,
- sessionType = sessionType,
- realUser = realUser,
- username = user,
- ipAddress = ipAddress,
- kyuubiInstance = connectionUrl,
- state = OperationState.PENDING.toString,
- resource = resource,
- className = className,
- requestName = name.orNull,
- requestConf = optimizedConf ++ kubernetesInfo, // save the kubernetes
info into request conf
- requestArgs = batchArgs,
- createTime = createTime,
- engineType = batchType,
- clusterManager = batchJobSubmissionOp.builder.clusterManager())
-
- // there is a chance that operation failed w/ duplicated key error
- sessionManager.insertMetadata(metaData)
+ }
+
+ (metadata, fromRecovery) match {
+ case (Some(initialMetadata), false) =>
+ // new batch job created using batch impl v2
+ val metadataToUpdate = Metadata(
+ identifier = initialMetadata.identifier,
+ kyuubiInstance = connectionUrl,
+ requestName = name.orNull,
+ requestConf = optimizedConf ++ kubernetesInfo, // save the
kubernetes info
+ clusterManager = batchJobSubmissionOp.builder.clusterManager())
+ sessionManager.updateMetadata(metadataToUpdate)
+ case (None, _) =>
+ // new batch job created using batch impl v1
+ val newMetadata = Metadata(
+ identifier = handle.identifier.toString,
+ sessionType = sessionType,
+ realUser = realUser,
+ username = user,
+ ipAddress = ipAddress,
+ kyuubiInstance = connectionUrl,
+ state = OperationState.PENDING.toString,
+ resource = resource,
+ className = className,
+ requestName = name.orNull,
+ requestConf = optimizedConf ++ kubernetesInfo, // save the
kubernetes info
+ requestArgs = batchArgs,
+ createTime = createTime,
+ engineType = batchType,
+ clusterManager = batchJobSubmissionOp.builder.clusterManager())
+
+ // there is a chance that operation failed w/ duplicated key error
+ sessionManager.insertMetadata(newMetadata)
+ case _ =>
}
checkSessionAccessPathURIs()
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
index 19259bb1b..8d3234699 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
@@ -144,10 +144,9 @@ class KyuubiSessionManager private (name: String) extends
SessionManager(name) {
batchName: Option[String],
resource: String,
className: String,
- batchConf: Map[String, String],
batchArgs: Seq[String],
- recoveryMetadata: Option[Metadata] = None,
- shouldRunAsync: Boolean): KyuubiBatchSession = {
+ metadata: Option[Metadata] = None,
+ fromRecovery: Boolean): KyuubiBatchSession = {
// scalastyle:on
val username = Option(user).filter(_.nonEmpty).getOrElse("anonymous")
val sessionConf = this.getConf.getUserDefaults(user)
@@ -162,10 +161,9 @@ class KyuubiSessionManager private (name: String) extends
SessionManager(name) {
batchName,
resource,
className,
- batchConf,
batchArgs,
- recoveryMetadata,
- shouldRunAsync)
+ metadata,
+ fromRecovery)
}
private[kyuubi] def openBatchSession(batchSession: KyuubiBatchSession):
SessionHandle = {
@@ -202,22 +200,19 @@ class KyuubiSessionManager private (name: String) extends
SessionManager(name) {
user: String,
password: String,
ipAddress: String,
- conf: Map[String, String],
- batchRequest: BatchRequest,
- shouldRunAsync: Boolean = true): SessionHandle = {
+ batchRequest: BatchRequest): SessionHandle = {
val batchSession = createBatchSession(
user,
password,
ipAddress,
- conf,
+ batchRequest.getConf.asScala.toMap,
batchRequest.getBatchType,
Option(batchRequest.getName),
batchRequest.getResource,
batchRequest.getClassName,
- batchRequest.getConf.asScala.toMap,
batchRequest.getArgs.asScala.toSeq,
None,
- shouldRunAsync)
+ fromRecovery = false)
openBatchSession(batchSession)
}
@@ -313,10 +308,9 @@ class KyuubiSessionManager private (name: String) extends
SessionManager(name) {
Option(metadata.requestName),
metadata.resource,
metadata.className,
- metadata.requestConf,
metadata.requestArgs,
Some(metadata),
- shouldRunAsync = true)
+ fromRecovery = true)
}).getOrElse(Seq.empty)
}
}
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala
index 7a4bfea1b..e4382a859 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala
@@ -116,7 +116,6 @@ class KyuubiOperationYarnClusterSuite extends
WithKyuubiServerOnYarn with HiveJD
"kyuubi",
"passwd",
"localhost",
- batchRequest.getConf.asScala.toMap,
batchRequest)
val session =
sessionManager.getSession(sessionHandle).asInstanceOf[KyuubiBatchSession]
@@ -180,7 +179,6 @@ class KyuubiOperationYarnClusterSuite extends
WithKyuubiServerOnYarn with HiveJD
"kyuubi",
"passwd",
"localhost",
- batchRequest.getConf.asScala.toMap,
batchRequest)
val session =
sessionManager.getSession(sessionHandle).asInstanceOf[KyuubiBatchSession]
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/events/handler/ServerJsonLoggingEventHandlerSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/events/handler/ServerJsonLoggingEventHandlerSuite.scala
index 7c79d6a87..2f794ed48 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/events/handler/ServerJsonLoggingEventHandlerSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/events/handler/ServerJsonLoggingEventHandlerSuite.scala
@@ -135,13 +135,12 @@ class ServerJsonLoggingEventHandlerSuite extends
WithKyuubiServer with HiveJDBCT
}
}
- val batchRequest = newSparkBatchRequest()
+ val batchRequest = newSparkBatchRequest(Map(KYUUBI_BATCH_ID_KEY ->
UUID.randomUUID().toString))
val sessionMgr =
server.backendService.sessionManager.asInstanceOf[KyuubiSessionManager]
val batchSessionHandle = sessionMgr.openBatchSession(
Utils.currentUser,
"kyuubi",
"127.0.0.1",
- Map(KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString),
batchRequest)
withSessionConf()(Map.empty)(Map("spark.sql.shuffle.partitions" -> "2")) {
withJdbcStatement() { statement =>
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
index 504b9ac26..3a47461a2 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
@@ -358,12 +358,12 @@ abstract class BatchesResourceSuiteBase extends
KyuubiFunSuite
"kyuubi",
"kyuubi",
InetAddress.getLocalHost.getCanonicalHostName,
- Map(KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString),
newBatchRequest(
"spark",
sparkBatchTestResource.get,
"",
- ""))
+ "",
+ Map(KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString)))
sessionManager.openSession(
TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V11,
"",
@@ -380,22 +380,22 @@ abstract class BatchesResourceSuiteBase extends
KyuubiFunSuite
"kyuubi",
"kyuubi",
InetAddress.getLocalHost.getCanonicalHostName,
- Map(KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString),
newBatchRequest(
"spark",
sparkBatchTestResource.get,
"",
- ""))
+ "",
+ Map(KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString)))
sessionManager.openBatchSession(
"kyuubi",
"kyuubi",
InetAddress.getLocalHost.getCanonicalHostName,
- Map(KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString),
newBatchRequest(
"spark",
sparkBatchTestResource.get,
"",
- ""))
+ "",
+ Map(KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString)))
val response2 = webTarget.path("api/v1/batches")
.queryParam("batchType", "spark")
@@ -780,12 +780,14 @@ abstract class BatchesResourceSuiteBase extends
KyuubiFunSuite
.be.sessionManager.asInstanceOf[KyuubiSessionManager]
val e = intercept[Exception] {
+ val conf = Map(
+ KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString,
+ "spark.jars" -> "disAllowPath")
sessionManager.openBatchSession(
"kyuubi",
"kyuubi",
InetAddress.getLocalHost.getCanonicalHostName,
- Map(KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString),
- newSparkBatchRequest(Map("spark.jars" -> "disAllowPath")))
+ newSparkBatchRequest(conf))
}
val sessionHandleRegex = "\\[\\S*]".r
val batchId =
sessionHandleRegex.findFirstMatchIn(e.getMessage).get.group(0)
@@ -803,12 +805,12 @@ abstract class BatchesResourceSuiteBase extends
KyuubiFunSuite
"kyuubi",
"kyuubi",
InetAddress.getLocalHost.getCanonicalHostName,
- Map(KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString),
newBatchRequest(
"spark",
sparkBatchTestResource.get,
"",
- uniqueName))
+ uniqueName,
+ Map(KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString)))
val response = webTarget.path("api/v1/batches")
.queryParam("batchName", uniqueName)
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchCliSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchCliSuite.scala
index 0c44fc3a8..bcf8c450e 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchCliSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchCliSuite.scala
@@ -290,12 +290,12 @@ class BatchCliSuite extends RestClientTestHelper with
TestPrematureExit with Bat
"kyuubi",
"kyuubi",
InetAddress.getLocalHost.getCanonicalHostName,
- Map(KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString),
newBatchRequest(
"spark",
"",
"",
- ""))
+ "",
+ Map(KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString)))
sessionManager.openSession(
TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V11,
"",
@@ -312,22 +312,22 @@ class BatchCliSuite extends RestClientTestHelper with
TestPrematureExit with Bat
"kyuubi",
"kyuubi",
InetAddress.getLocalHost.getCanonicalHostName,
- Map(KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString),
newBatchRequest(
"spark",
"",
"",
- ""))
+ "",
+ Map(KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString)))
sessionManager.openBatchSession(
"kyuubi",
"kyuubi",
InetAddress.getLocalHost.getCanonicalHostName,
- Map(KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString),
newBatchRequest(
"spark",
"",
"",
- ""))
+ "",
+ Map(KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString)))
val listArgs = Array(
"list",