This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch branch-1.8
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/branch-1.8 by this push:
     new b222bd2db [KYUUBI #5243] Distinguish metadata between batch impl v2 
and recovery
b222bd2db is described below

commit b222bd2db31ffa6219c560ef276fef62146c417b
Author: Cheng Pan <[email protected]>
AuthorDate: Wed Sep 6 02:51:43 2023 +0800

    [KYUUBI #5243] Distinguish metadata between batch impl v2 and recovery
    
    ### _Why are the changes needed?_
    
    The `recoveryMetadata` is not accurate after batch impl is introduced. This 
PR proposes to rename `recoveryMetadata` to `metadata` and introduce a 
dedicated flay `fromRecovery` to distinguish metadata between them.
    
    This PR also partially reverts #4798, by removing unnecessary constructor 
parameters `shouldRunAsync` and `batchConf`
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run 
test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests)
 locally before make a pull request
    
    ### _Was this patch authored or co-authored using generative AI tooling?_
    
    No.
    
    Closes #5243 from pan3793/meta-recov.
    
    Closes #5243
    
    0718fbefe [Cheng Pan] nit
    b8358464c [Cheng Pan] simplify
    a2d6519c6 [Cheng Pan] fix test
    2dad868bd [Cheng Pan] refactor
    f83d2a602 [Cheng Pan] Distinguish batch impl v2 metadata from recovery
    
    Authored-by: Cheng Pan <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
    (cherry picked from commit 6a23f88b00e03b36f8f68c526128b166f866dc6f)
    Signed-off-by: Cheng Pan <[email protected]>
---
 .../test/spark/SparkOnKubernetesTestsSuite.scala   |  3 -
 .../kyuubi/operation/BatchJobSubmission.scala      | 45 +++++++------
 .../kyuubi/operation/KyuubiOperationManager.scala  |  6 +-
 .../apache/kyuubi/server/KyuubiBatchService.scala  | 14 +---
 .../kyuubi/server/api/v1/BatchesResource.scala     |  1 -
 .../apache/kyuubi/session/KyuubiBatchSession.scala | 77 +++++++++++++---------
 .../kyuubi/session/KyuubiSessionManager.scala      | 22 +++----
 .../org/apache/kyuubi/WithKyuubiServerOnYarn.scala |  2 -
 .../ServerJsonLoggingEventHandlerSuite.scala       |  3 +-
 .../server/api/v1/BatchesResourceSuite.scala       | 22 ++++---
 .../kyuubi/server/rest/client/BatchCliSuite.scala  | 12 ++--
 11 files changed, 102 insertions(+), 105 deletions(-)

diff --git 
a/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala
 
b/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala
index 037681a3f..3f591e604 100644
--- 
a/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala
+++ 
b/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala
@@ -19,7 +19,6 @@ package org.apache.kyuubi.kubernetes.test.spark
 
 import java.util.UUID
 
-import scala.collection.JavaConverters._
 import scala.concurrent.duration._
 
 import org.apache.hadoop.conf.Configuration
@@ -149,7 +148,6 @@ class KyuubiOperationKubernetesClusterClientModeSuite
       "kyuubi",
       "passwd",
       "localhost",
-      batchRequest.getConf.asScala.toMap,
       batchRequest)
 
     eventually(timeout(3.minutes), interval(50.milliseconds)) {
@@ -217,7 +215,6 @@ class KyuubiOperationKubernetesClusterClusterModeSuite
       "runner",
       "passwd",
       "localhost",
-      batchRequest.getConf.asScala.toMap,
       batchRequest)
 
     // wait for driver pod start
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
index ac723b2c6..4ea609540 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
@@ -58,11 +58,12 @@ class BatchJobSubmission(
     className: String,
     batchConf: Map[String, String],
     batchArgs: Seq[String],
-    recoveryMetadata: Option[Metadata],
-    override val shouldRunAsync: Boolean)
+    metadata: Option[Metadata])
   extends KyuubiApplicationOperation(session) {
   import BatchJobSubmission._
 
+  override def shouldRunAsync: Boolean = true
+
   private val _operationLog = OperationLog.createOperationLog(session, 
getHandle)
 
   private val applicationManager = session.sessionManager.applicationManager
@@ -75,7 +76,7 @@ class BatchJobSubmission(
   private var killMessage: KillResponse = (false, "UNKNOWN")
   def getKillMessage: KillResponse = killMessage
 
-  @volatile private var _appStartTime = 
recoveryMetadata.map(_.engineOpenTime).getOrElse(0L)
+  @volatile private var _appStartTime = 
metadata.map(_.engineOpenTime).getOrElse(0L)
   def appStartTime: Long = _appStartTime
   def appStarted: Boolean = _appStartTime > 0
 
@@ -184,21 +185,24 @@ class BatchJobSubmission(
   override protected def runInternal(): Unit = session.handleSessionException {
     val asyncOperation: Runnable = () => {
       try {
-        recoveryMetadata match {
+        metadata match {
           case Some(metadata) if metadata.peerInstanceClosed =>
             setState(OperationState.CANCELED)
           case Some(metadata) if metadata.state == 
OperationState.PENDING.toString =>
-            // In recovery mode, only submit batch job when previous state is 
PENDING
-            // and fail to fetch the status including appId from resource 
manager.
-            // Otherwise, monitor the submitted batch application.
+            // case 1: new batch job created using batch impl v2
+            // case 2: batch job from recovery, do submission only when 
previous state is
+            // PENDING and fail to fetch the status by appId from resource 
manager, which
+            // is similar with case 1; otherwise, monitor the submitted batch 
application.
             _applicationInfo = currentApplicationInfo()
             applicationId(_applicationInfo) match {
-              case Some(appId) => monitorBatchJob(appId)
               case None => submitAndMonitorBatchJob()
+              case Some(appId) => monitorBatchJob(appId)
             }
           case Some(metadata) =>
+            // batch job from recovery which was submitted
             monitorBatchJob(metadata.engineId)
           case None =>
+            // brand-new job created using batch impl v1
             submitAndMonitorBatchJob()
         }
         setStateIfNotCanceled(OperationState.FINISHED)
@@ -219,7 +223,6 @@ class BatchJobSubmission(
         updateBatchMetadata()
       }
     }
-    if (!shouldRunAsync) getBackgroundHandle.get()
   }
 
   private def submitAndMonitorBatchJob(): Unit = {
@@ -295,19 +298,19 @@ class BatchJobSubmission(
     }
     if (_applicationInfo.isEmpty) {
       info(s"The $batchType batch[$batchId] job: $appId not found, assume that 
it has finished.")
-    } else if (applicationFailed(_applicationInfo)) {
+      return
+    }
+    if (applicationFailed(_applicationInfo)) {
+      throw new KyuubiException(s"$batchType batch[$batchId] job failed: 
${_applicationInfo}")
+    }
+    updateBatchMetadata()
+    // TODO: add limit for max batch job submission lifetime
+    while (_applicationInfo.isDefined && 
!applicationTerminated(_applicationInfo)) {
+      Thread.sleep(applicationCheckInterval)
+      updateApplicationInfoMetadataIfNeeded()
+    }
+    if (applicationFailed(_applicationInfo)) {
       throw new KyuubiException(s"$batchType batch[$batchId] job failed: 
${_applicationInfo}")
-    } else {
-      updateBatchMetadata()
-      // TODO: add limit for max batch job submission lifetime
-      while (_applicationInfo.isDefined && 
!applicationTerminated(_applicationInfo)) {
-        Thread.sleep(applicationCheckInterval)
-        updateApplicationInfoMetadataIfNeeded()
-      }
-
-      if (applicationFailed(_applicationInfo)) {
-        throw new KyuubiException(s"$batchType batch[$batchId] job failed: 
${_applicationInfo}")
-      }
     }
   }
 
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
index 8ae9c91f8..739c99cd7 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
@@ -81,8 +81,7 @@ class KyuubiOperationManager private (name: String) extends 
OperationManager(nam
       className: String,
       batchConf: Map[String, String],
       batchArgs: Seq[String],
-      recoveryMetadata: Option[Metadata],
-      shouldRunAsync: Boolean): BatchJobSubmission = {
+      metadata: Option[Metadata]): BatchJobSubmission = {
     val operation = new BatchJobSubmission(
       session,
       batchType,
@@ -91,8 +90,7 @@ class KyuubiOperationManager private (name: String) extends 
OperationManager(nam
       className,
       batchConf,
       batchArgs,
-      recoveryMetadata,
-      shouldRunAsync)
+      metadata)
     addOperation(operation)
     operation
   }
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala
index 7ed2ab8e1..bf10a68fa 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala
@@ -22,7 +22,6 @@ import java.util.concurrent.atomic.AtomicBoolean
 import org.apache.kyuubi.config.KyuubiConf.BATCH_SUBMITTER_THREADS
 import org.apache.kyuubi.operation.OperationState
 import org.apache.kyuubi.server.metadata.MetadataManager
-import org.apache.kyuubi.server.metadata.api.Metadata
 import org.apache.kyuubi.service.{AbstractService, Serverable}
 import org.apache.kyuubi.session.KyuubiSessionManager
 import org.apache.kyuubi.util.ThreadUtils
@@ -81,16 +80,9 @@ class KyuubiBatchService(
               Option(metadata.requestName),
               metadata.resource,
               metadata.className,
-              metadata.requestConf,
               metadata.requestArgs,
-              Some(metadata), // TODO some logic need to fix since it's not 
from recovery
-              shouldRunAsync = true)
-            val metadataForUpdate = Metadata(
-              identifier = batchId,
-              kyuubiInstance = kyuubiInstance,
-              requestConf = batchSession.optimizedConf,
-              clusterManager = 
batchSession.batchJobSubmissionOp.builder.clusterManager())
-            metadataManager.updateMetadata(metadataForUpdate, 
asyncRetryOnError = false)
+              Some(metadata),
+              fromRecovery = false)
             val sessionHandle = sessionManager.openBatchSession(batchSession)
             var submitted = false
             while (!submitted) { // block until batch job submitted
@@ -113,7 +105,7 @@ class KyuubiBatchService(
               // }
               if (!submitted) Thread.sleep(1000)
             }
-            info(s"$batchId is submitted.")
+            info(s"$batchId is submitted or finished.")
         }
       }
     }
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
index d7e8b615a..12db68aeb 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
@@ -269,7 +269,6 @@ private[v1] class BatchesResource extends ApiRequestContext 
with Logging {
             userName,
             "anonymous",
             ipAddress,
-            request.getConf.asScala.toMap,
             request)
         } match {
           case Success(sessionHandle) =>
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala
index 014bbced3..c8563bca1 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala
@@ -41,10 +41,9 @@ class KyuubiBatchSession(
     batchName: Option[String],
     resource: String,
     className: String,
-    batchConf: Map[String, String],
     batchArgs: Seq[String],
-    recoveryMetadata: Option[Metadata] = None,
-    shouldRunAsync: Boolean)
+    metadata: Option[Metadata] = None,
+    fromRecovery: Boolean)
   extends KyuubiSession(
     TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1,
     user,
@@ -55,11 +54,11 @@ class KyuubiBatchSession(
   override val sessionType: SessionType = SessionType.BATCH
 
   override val handle: SessionHandle = {
-    val batchId = 
recoveryMetadata.map(_.identifier).getOrElse(conf(KYUUBI_BATCH_ID_KEY))
+    val batchId = 
metadata.map(_.identifier).getOrElse(conf(KYUUBI_BATCH_ID_KEY))
     SessionHandle.fromUUID(batchId)
   }
 
-  override def createTime: Long = 
recoveryMetadata.map(_.createTime).getOrElse(super.createTime)
+  override def createTime: Long = 
metadata.map(_.createTime).getOrElse(super.createTime)
 
   override def getNoOperationTime: Long = {
     if (batchJobSubmissionOp != null && !OperationState.isTerminal(
@@ -74,7 +73,7 @@ class KyuubiBatchSession(
     sessionManager.getConf.get(KyuubiConf.BATCH_SESSION_IDLE_TIMEOUT)
 
   override val normalizedConf: Map[String, String] =
-    sessionConf.getBatchConf(batchType) ++ 
sessionManager.validateBatchConf(batchConf)
+    sessionConf.getBatchConf(batchType) ++ 
sessionManager.validateBatchConf(conf)
 
   val optimizedConf: Map[String, String] = {
     val confOverlay = sessionManager.sessionConfAdvisor.getConfOverlay(
@@ -95,7 +94,7 @@ class KyuubiBatchSession(
 
   // whether the resource file is from uploading
   private[kyuubi] val isResourceUploaded: Boolean =
-    batchConf.getOrElse(KyuubiReservedKeys.KYUUBI_BATCH_RESOURCE_UPLOADED_KEY, 
"false").toBoolean
+    conf.getOrElse(KyuubiReservedKeys.KYUUBI_BATCH_RESOURCE_UPLOADED_KEY, 
"false").toBoolean
 
   private[kyuubi] lazy val batchJobSubmissionOp = 
sessionManager.operationManager
     .newBatchJobSubmissionOperation(
@@ -106,8 +105,7 @@ class KyuubiBatchSession(
       className,
       optimizedConf,
       batchArgs,
-      recoveryMetadata,
-      shouldRunAsync)
+      metadata)
 
   private def waitMetadataRequestsRetryCompletion(): Unit = {
     val batchId = batchJobSubmissionOp.batchId
@@ -122,7 +120,9 @@ class KyuubiBatchSession(
   }
 
   private val sessionEvent = KyuubiSessionEvent(this)
-  recoveryMetadata.foreach(metadata => sessionEvent.engineId = 
metadata.engineId)
+  if (fromRecovery) {
+    metadata.foreach { m => sessionEvent.engineId = m.engineId }
+  }
   EventBus.post(sessionEvent)
 
   override def getSessionEvent: Option[KyuubiSessionEvent] = {
@@ -142,32 +142,47 @@ class KyuubiBatchSession(
   override def open(): Unit = handleSessionException {
     traceMetricsOnOpen()
 
-    if (recoveryMetadata.isEmpty) {
+    lazy val kubernetesInfo: Map[String, String] = {
       val appMgrInfo = batchJobSubmissionOp.builder.appMgrInfo()
-      val kubernetesInfo = appMgrInfo.kubernetesInfo.context.map { context =>
+      appMgrInfo.kubernetesInfo.context.map { context =>
         Map(KyuubiConf.KUBERNETES_CONTEXT.key -> context)
       }.getOrElse(Map.empty) ++ appMgrInfo.kubernetesInfo.namespace.map { 
namespace =>
         Map(KyuubiConf.KUBERNETES_NAMESPACE.key -> namespace)
       }.getOrElse(Map.empty)
-      val metaData = Metadata(
-        identifier = handle.identifier.toString,
-        sessionType = sessionType,
-        realUser = realUser,
-        username = user,
-        ipAddress = ipAddress,
-        kyuubiInstance = connectionUrl,
-        state = OperationState.PENDING.toString,
-        resource = resource,
-        className = className,
-        requestName = name.orNull,
-        requestConf = optimizedConf ++ kubernetesInfo, // save the kubernetes 
info into request conf
-        requestArgs = batchArgs,
-        createTime = createTime,
-        engineType = batchType,
-        clusterManager = batchJobSubmissionOp.builder.clusterManager())
-
-      // there is a chance that operation failed w/ duplicated key error
-      sessionManager.insertMetadata(metaData)
+    }
+
+    (metadata, fromRecovery) match {
+      case (Some(initialMetadata), false) =>
+        // new batch job created using batch impl v2
+        val metadataToUpdate = Metadata(
+          identifier = initialMetadata.identifier,
+          kyuubiInstance = connectionUrl,
+          requestName = name.orNull,
+          requestConf = optimizedConf ++ kubernetesInfo, // save the 
kubernetes info
+          clusterManager = batchJobSubmissionOp.builder.clusterManager())
+        sessionManager.updateMetadata(metadataToUpdate)
+      case (None, _) =>
+        // new batch job created using batch impl v1
+        val newMetadata = Metadata(
+          identifier = handle.identifier.toString,
+          sessionType = sessionType,
+          realUser = realUser,
+          username = user,
+          ipAddress = ipAddress,
+          kyuubiInstance = connectionUrl,
+          state = OperationState.PENDING.toString,
+          resource = resource,
+          className = className,
+          requestName = name.orNull,
+          requestConf = optimizedConf ++ kubernetesInfo, // save the 
kubernetes info
+          requestArgs = batchArgs,
+          createTime = createTime,
+          engineType = batchType,
+          clusterManager = batchJobSubmissionOp.builder.clusterManager())
+
+        // there is a chance that operation failed w/ duplicated key error
+        sessionManager.insertMetadata(newMetadata)
+      case _ =>
     }
 
     checkSessionAccessPathURIs()
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
index 19259bb1b..8d3234699 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
@@ -144,10 +144,9 @@ class KyuubiSessionManager private (name: String) extends 
SessionManager(name) {
       batchName: Option[String],
       resource: String,
       className: String,
-      batchConf: Map[String, String],
       batchArgs: Seq[String],
-      recoveryMetadata: Option[Metadata] = None,
-      shouldRunAsync: Boolean): KyuubiBatchSession = {
+      metadata: Option[Metadata] = None,
+      fromRecovery: Boolean): KyuubiBatchSession = {
     // scalastyle:on
     val username = Option(user).filter(_.nonEmpty).getOrElse("anonymous")
     val sessionConf = this.getConf.getUserDefaults(user)
@@ -162,10 +161,9 @@ class KyuubiSessionManager private (name: String) extends 
SessionManager(name) {
       batchName,
       resource,
       className,
-      batchConf,
       batchArgs,
-      recoveryMetadata,
-      shouldRunAsync)
+      metadata,
+      fromRecovery)
   }
 
   private[kyuubi] def openBatchSession(batchSession: KyuubiBatchSession): 
SessionHandle = {
@@ -202,22 +200,19 @@ class KyuubiSessionManager private (name: String) extends 
SessionManager(name) {
       user: String,
       password: String,
       ipAddress: String,
-      conf: Map[String, String],
-      batchRequest: BatchRequest,
-      shouldRunAsync: Boolean = true): SessionHandle = {
+      batchRequest: BatchRequest): SessionHandle = {
     val batchSession = createBatchSession(
       user,
       password,
       ipAddress,
-      conf,
+      batchRequest.getConf.asScala.toMap,
       batchRequest.getBatchType,
       Option(batchRequest.getName),
       batchRequest.getResource,
       batchRequest.getClassName,
-      batchRequest.getConf.asScala.toMap,
       batchRequest.getArgs.asScala.toSeq,
       None,
-      shouldRunAsync)
+      fromRecovery = false)
     openBatchSession(batchSession)
   }
 
@@ -313,10 +308,9 @@ class KyuubiSessionManager private (name: String) extends 
SessionManager(name) {
           Option(metadata.requestName),
           metadata.resource,
           metadata.className,
-          metadata.requestConf,
           metadata.requestArgs,
           Some(metadata),
-          shouldRunAsync = true)
+          fromRecovery = true)
       }).getOrElse(Seq.empty)
     }
   }
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala
index 7a4bfea1b..e4382a859 100644
--- 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala
+++ 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala
@@ -116,7 +116,6 @@ class KyuubiOperationYarnClusterSuite extends 
WithKyuubiServerOnYarn with HiveJD
       "kyuubi",
       "passwd",
       "localhost",
-      batchRequest.getConf.asScala.toMap,
       batchRequest)
 
     val session = 
sessionManager.getSession(sessionHandle).asInstanceOf[KyuubiBatchSession]
@@ -180,7 +179,6 @@ class KyuubiOperationYarnClusterSuite extends 
WithKyuubiServerOnYarn with HiveJD
       "kyuubi",
       "passwd",
       "localhost",
-      batchRequest.getConf.asScala.toMap,
       batchRequest)
 
     val session = 
sessionManager.getSession(sessionHandle).asInstanceOf[KyuubiBatchSession]
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/events/handler/ServerJsonLoggingEventHandlerSuite.scala
 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/events/handler/ServerJsonLoggingEventHandlerSuite.scala
index 7c79d6a87..2f794ed48 100644
--- 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/events/handler/ServerJsonLoggingEventHandlerSuite.scala
+++ 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/events/handler/ServerJsonLoggingEventHandlerSuite.scala
@@ -135,13 +135,12 @@ class ServerJsonLoggingEventHandlerSuite extends 
WithKyuubiServer with HiveJDBCT
       }
     }
 
-    val batchRequest = newSparkBatchRequest()
+    val batchRequest = newSparkBatchRequest(Map(KYUUBI_BATCH_ID_KEY -> 
UUID.randomUUID().toString))
     val sessionMgr = 
server.backendService.sessionManager.asInstanceOf[KyuubiSessionManager]
     val batchSessionHandle = sessionMgr.openBatchSession(
       Utils.currentUser,
       "kyuubi",
       "127.0.0.1",
-      Map(KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString),
       batchRequest)
     withSessionConf()(Map.empty)(Map("spark.sql.shuffle.partitions" -> "2")) {
       withJdbcStatement() { statement =>
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
index 504b9ac26..3a47461a2 100644
--- 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
+++ 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
@@ -358,12 +358,12 @@ abstract class BatchesResourceSuiteBase extends 
KyuubiFunSuite
       "kyuubi",
       "kyuubi",
       InetAddress.getLocalHost.getCanonicalHostName,
-      Map(KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString),
       newBatchRequest(
         "spark",
         sparkBatchTestResource.get,
         "",
-        ""))
+        "",
+        Map(KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString)))
     sessionManager.openSession(
       TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V11,
       "",
@@ -380,22 +380,22 @@ abstract class BatchesResourceSuiteBase extends 
KyuubiFunSuite
       "kyuubi",
       "kyuubi",
       InetAddress.getLocalHost.getCanonicalHostName,
-      Map(KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString),
       newBatchRequest(
         "spark",
         sparkBatchTestResource.get,
         "",
-        ""))
+        "",
+        Map(KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString)))
     sessionManager.openBatchSession(
       "kyuubi",
       "kyuubi",
       InetAddress.getLocalHost.getCanonicalHostName,
-      Map(KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString),
       newBatchRequest(
         "spark",
         sparkBatchTestResource.get,
         "",
-        ""))
+        "",
+        Map(KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString)))
 
     val response2 = webTarget.path("api/v1/batches")
       .queryParam("batchType", "spark")
@@ -780,12 +780,14 @@ abstract class BatchesResourceSuiteBase extends 
KyuubiFunSuite
       .be.sessionManager.asInstanceOf[KyuubiSessionManager]
 
     val e = intercept[Exception] {
+      val conf = Map(
+        KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString,
+        "spark.jars" -> "disAllowPath")
       sessionManager.openBatchSession(
         "kyuubi",
         "kyuubi",
         InetAddress.getLocalHost.getCanonicalHostName,
-        Map(KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString),
-        newSparkBatchRequest(Map("spark.jars" -> "disAllowPath")))
+        newSparkBatchRequest(conf))
     }
     val sessionHandleRegex = "\\[\\S*]".r
     val batchId = 
sessionHandleRegex.findFirstMatchIn(e.getMessage).get.group(0)
@@ -803,12 +805,12 @@ abstract class BatchesResourceSuiteBase extends 
KyuubiFunSuite
       "kyuubi",
       "kyuubi",
       InetAddress.getLocalHost.getCanonicalHostName,
-      Map(KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString),
       newBatchRequest(
         "spark",
         sparkBatchTestResource.get,
         "",
-        uniqueName))
+        uniqueName,
+        Map(KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString)))
 
     val response = webTarget.path("api/v1/batches")
       .queryParam("batchName", uniqueName)
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchCliSuite.scala
 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchCliSuite.scala
index 0c44fc3a8..bcf8c450e 100644
--- 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchCliSuite.scala
+++ 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchCliSuite.scala
@@ -290,12 +290,12 @@ class BatchCliSuite extends RestClientTestHelper with 
TestPrematureExit with Bat
       "kyuubi",
       "kyuubi",
       InetAddress.getLocalHost.getCanonicalHostName,
-      Map(KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString),
       newBatchRequest(
         "spark",
         "",
         "",
-        ""))
+        "",
+        Map(KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString)))
     sessionManager.openSession(
       TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V11,
       "",
@@ -312,22 +312,22 @@ class BatchCliSuite extends RestClientTestHelper with 
TestPrematureExit with Bat
       "kyuubi",
       "kyuubi",
       InetAddress.getLocalHost.getCanonicalHostName,
-      Map(KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString),
       newBatchRequest(
         "spark",
         "",
         "",
-        ""))
+        "",
+        Map(KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString)))
     sessionManager.openBatchSession(
       "kyuubi",
       "kyuubi",
       InetAddress.getLocalHost.getCanonicalHostName,
-      Map(KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString),
       newBatchRequest(
         "spark",
         "",
         "",
-        ""))
+        "",
+        Map(KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString)))
 
     val listArgs = Array(
       "list",

Reply via email to