This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new d5a493af7 [KYUUBI #4790] Initial implement Batch V2
d5a493af7 is described below

commit d5a493af7d8142a8819187d0af24042c1b2335da
Author: Cheng Pan <[email protected]>
AuthorDate: Mon Jul 31 18:18:11 2023 +0800

    [KYUUBI #4790] Initial implement Batch V2
    
    ### _Why are the changes needed?_
    
    #### How is it done today?
    
    The current procedure of Batch Job API, called V1
    
    ##### CREATE batch job procedure in Batch V1
    
    ```mermaid
    sequenceDiagram
    participant Client
    participant Server
    participant Metastore
    participant RM
    
    Client ->> Server : Create Batch Job
    Server ->> Server : Create Batch Operator
    Server ->> Metastore : Persist Job metadata (PENDING)
    Server ->> Server : Put Batch Operator into Execution thread pool
    Server ->> Client : Batch Job Info
    Server ->> RM : Submit Applicition (in Execution thread pool)
    loop Application Check
        Server ->> RM : Query Application Status
        Server ->> Metastore : Update Batch Status
    end
    ```
    
    ##### GET batch job info procedure in Batch V1
    
    ```mermaid
    sequenceDiagram
    participant Client
    participant Server
    participant Metastore
    participant RM
    
    Client ->> Server : Query Batch Job Info
    alt KyuubiInstance matched
        Server ->> Client : Batch Job Info
    else
        Server ->> Server : Forward Request to expected KyuubiInstance
    end
    ```
    
    <!--
    ```mermaid
    sequenceDiagram
    participant Client
    participant Server
    participant Metastore
    participant RM
    
    Client ->> Server : Fetch Batch Job logs
    alt KyuubiInstance matched
        Server ->> Client : Batch Job logs
    else
        Server ->> Server : Forward Request to expected KyuubiInstance
    end
    
    Client ->> Server : Close Batch Job
    alt KyuubiInstance matched
        Server ->> RM : Close the Application
        Server ->> Metastore : Update Batch Status
        Server ->> Client : Closed Batch Job Info
    else
        Server ->> Server : Forward Request to expected KyuubiInstance
    end
    ```
    -->
    
    #### What is new in your approach?
    
    This PR proposes a new way for batch job submission, called V2
    
    ##### CREATE batch job procedure in Batch V2
    
    ```mermaid
    sequenceDiagram
    participant Client
    participant Server
    participant Metastore
    participant RM
    
    Client ->> Server : Create Batch Job
    Server ->> Metastore : Persist Job metadata (INITIALIZED)
    Server ->> Client : Batch Job Info
    
    loop Forever in dedicated thread pool
        Server ->> Metastore : Pick up and lock INITIALIZED job
        Server ->> RM : Submit Application
        Server ->> RM : Query Application Status
        Server ->> Metastore : Update Batch Status
    end
    ```
    
    ##### GET batch job info procedure in Batch V2
    
    ```mermaid
    sequenceDiagram
    participant Client
    participant Server
    participant Metastore
    participant RM
    
    Client ->> Server : Query Batch Job Info
    Server ->> Metastore : Query Batch Job Info
    Server ->> Client : Batch Job Info
    ```
    
    <!--
    ```mermaid
    sequenceDiagram
    participant Client
    participant Server
    participant Metastore
    participant RM
    
    Client ->> Server : Fetch Batch Job logs
    alt KyuubiInstance matched
        Server ->> Client : Batch Job logs
    else
        Server ->> Server : Forward Request to expected KyuubiInstance
    end
    
    Client ->> Server : Close Batch Job
    alt KyuubiInstance matched
        Server ->> RM : Close the Application
        Server ->> Metastore : Update Batch Status
        Server ->> Client : Closed Batch Job Info
    else
        Server ->> Server : Forward Request to expected KyuubiInstance
    end
    ```
    -->
    
    #### What are the limits of current practice, and why do you think it will 
be successful?
    
    Pros:
    
    1. The CREATE request becomes light and returns faster. In V1, we have 
struggled with whether the response should wait for the engine to be submitted 
to RM, and how to report the un-submitted job status to the client; in V2, the 
CREATE request just simply inserts a new record into metastore and returns w/ 
INITIALIZED state.
    2. In common practice, Kyuubi server cluster is deployed behind the load 
balancer, and the load balancer does not know the real load of each Kyuubi 
server, suppose it uses Random/RoundRobbin/IPHash policies to forward requests, 
the existing Batch V1 implementation may cause some Kyuubi servers in high load 
but others' load are low, because it always uses the requested Kyuubi server to 
do batch submission; in V2, the Kyuubi server is easy to know the load of 
itself, e.g. measure by CPU [...]
    3. In V1, the metrics are almost independent in each Kyuubi server; in V2, 
it's easy to expose global metrics of batch jobs when using sharable storage as 
metastore backend, e.g. we can easily get how many batches are queued in 
metastore, and how many batches are managed by each Kyuubi server, by querying 
the metastore backend directly or metrics exposed by each Kyuubi server.
    
    Cons:
    
    1. V1 assumes Kyuubi server tolerant long time outage of metastore, V2 
forcibly depends on the availability of metastore. But we can move the existing 
forwarding logic and async retry logic to the implementation of `Metastore` to 
overcome this regression.
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run 
test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests)
 locally before make a pull request
    
    Closes #4790 from pan3793/batch-v2.
    
    Closes #4790
    
    860698ad6 [Cheng Pan] BATCH_IMPL_VERSION
    b9c68aa2f [Cheng Pan] kyuubi.batch.impl.version
    17e4f199a [Cheng Pan] submitter.threads=100
    7c0bdb0c1 [Cheng Pan] Initial implement Batch v2
    
    Authored-by: Cheng Pan <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
---
 .../org/apache/kyuubi/config/KyuubiConf.scala      |  35 +++
 .../org/apache/kyuubi/client/api/v1/dto/Count.java |  58 +++++
 .../apache/kyuubi/server/KyuubiBatchService.scala  | 129 ++++++++++
 .../org/apache/kyuubi/server/KyuubiServer.scala    |   8 +-
 .../scala/org/apache/kyuubi/server/api/api.scala   |   7 +-
 .../kyuubi/server/api/v1/AdminResource.scala       |  27 ++-
 .../kyuubi/server/api/v1/BatchesResource.scala     |  47 +++-
 .../kyuubi/server/metadata/MetadataManager.scala   |  21 ++
 .../kyuubi/server/metadata/MetadataStore.scala     |  24 ++
 .../server/metadata/jdbc/JDBCMetadataStore.scala   |  62 +++++
 .../kyuubi/session/KyuubiSessionManager.scala      |  30 +++
 kyuubi-server/src/test/resources/log4j2-test.xml   |   4 +
 .../server/api/v1/BatchesResourceSuite.scala       | 266 +++++++++++++--------
 13 files changed, 615 insertions(+), 103 deletions(-)

diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index 44e3fcd17..175bf79c6 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -1612,6 +1612,41 @@ object KyuubiConf {
       .booleanConf
       .createWithDefault(true)
 
+  val BATCH_IMPL_VERSION: ConfigEntry[String] =
+    buildConf("kyuubi.batch.impl.version")
+      .internal
+      .serverOnly
+      .doc("Batch API version, candidates: 1, 2. " +
+        "Note: Batch API v2 is experimental and under rapid development, this 
configuration " +
+        "is added to allow explorers conveniently testing the developing Batch 
v2 API, not " +
+        "intended exposing to end users, it may be removed in anytime.")
+      .version("1.8.0")
+      .stringConf
+      .createWithDefault("1")
+
+  val BATCH_SUBMITTER_ENABLED: ConfigEntry[Boolean] =
+    buildConf("kyuubi.batch.submitter.enabled")
+      .internal
+      .serverOnly
+      .doc("When Batch API v2 is enabled, Kyuubi server requires to pick the 
INITIALIZED " +
+        "batch job from metastore and submits it to Resource Manager. " +
+        "Note: Batch API v2 is experimental and under rapid development, this 
configuration " +
+        "is added to allow explorers conveniently testing the developing Batch 
v2 API, not " +
+        "intended exposing to end users, it may be removed in anytime.")
+      .version("1.8.0")
+      .booleanConf
+      .createWithDefault(false)
+
+  val BATCH_SUBMITTER_THREADS: ConfigEntry[Int] =
+    buildConf("kyuubi.batch.submitter.threads")
+      .internal
+      .serverOnly
+      .doc("Number of threads in batch job submitter, this configuration only 
take effects " +
+        s"when ${BATCH_SUBMITTER_ENABLED.key} is enabled")
+      .version("1.8.0")
+      .intConf
+      .createWithDefault(100)
+
   val SERVER_EXEC_POOL_SIZE: ConfigEntry[Int] =
     buildConf("kyuubi.backend.server.exec.pool.size")
       .doc("Number of threads in the operation execution thread pool of Kyuubi 
server")
diff --git 
a/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/api/v1/dto/Count.java
 
b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/api/v1/dto/Count.java
new file mode 100644
index 000000000..8f77ccd13
--- /dev/null
+++ 
b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/api/v1/dto/Count.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.client.api.v1.dto;
+
+import java.util.Objects;
+import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+
+public class Count {
+  private Integer count;
+
+  public Count() {}
+
+  public Count(Integer count) {
+    this.count = count;
+  }
+
+  public Integer getCount() {
+    return count;
+  }
+
+  public void setCount(Integer count) {
+    this.count = count;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    Count that = (Count) o;
+    return Objects.equals(getCount(), that.getCount());
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(getCount());
+  }
+
+  @Override
+  public String toString() {
+    return ReflectionToStringBuilder.toString(this, ToStringStyle.JSON_STYLE);
+  }
+}
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala
new file mode 100644
index 000000000..250662835
--- /dev/null
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.server
+
+import java.util.concurrent.atomic.AtomicBoolean
+
+import org.apache.kyuubi.config.KyuubiConf.BATCH_SUBMITTER_THREADS
+import org.apache.kyuubi.operation.OperationState
+import org.apache.kyuubi.server.metadata.MetadataManager
+import org.apache.kyuubi.server.metadata.api.Metadata
+import org.apache.kyuubi.service.{AbstractService, Serverable}
+import org.apache.kyuubi.session.KyuubiSessionManager
+import org.apache.kyuubi.util.ThreadUtils
+
+class KyuubiBatchService(
+    server: Serverable,
+    sessionManager: KyuubiSessionManager)
+  extends AbstractService(classOf[KyuubiBatchService].getSimpleName) {
+
+  private lazy val restFrontend = server.frontendServices
+    .filter(_.isInstanceOf[KyuubiRestFrontendService])
+    .head
+
+  private def kyuubiInstance: String = restFrontend.connectionUrl
+
+  // TODO expose metrics, including pending/running/succeeded/failed batches
+  // TODO handle dangling batches, e.g. batch is picked and changed state to 
pending,
+  //      but the Server crashed before submitting or updating status to 
metastore
+
+  private lazy val metadataManager: MetadataManager = 
sessionManager.metadataManager.get
+  private val running: AtomicBoolean = new AtomicBoolean(false)
+  private lazy val batchExecutor = ThreadUtils
+    .newDaemonFixedThreadPool(conf.get(BATCH_SUBMITTER_THREADS), 
"kyuubi-batch-submitter")
+
+  def cancelUnscheduledBatch(batchId: String): Boolean = {
+    metadataManager.cancelUnscheduledBatch(batchId)
+  }
+
+  def countBatch(
+      batchType: String,
+      batchUser: Option[String],
+      batchState: Option[String] = None,
+      kyuubiInstance: Option[String] = None): Int = {
+    metadataManager.countBatch(
+      batchType,
+      batchUser.orNull,
+      batchState.orNull,
+      kyuubiInstance.orNull)
+  }
+
+  override def start(): Unit = {
+    assert(running.compareAndSet(false, true))
+    val submitTask: Runnable = () => {
+      while (running.get) {
+        metadataManager.pickBatchForSubmitting(kyuubiInstance) match {
+          case None => Thread.sleep(1000)
+          case Some(metadata) =>
+            val batchId = metadata.identifier
+            info(s"$batchId is picked for submission.")
+            val batchSession = sessionManager.createBatchSession(
+              metadata.username,
+              "anonymous",
+              metadata.ipAddress,
+              metadata.requestConf,
+              metadata.engineType,
+              Option(metadata.requestName),
+              metadata.resource,
+              metadata.className,
+              metadata.requestConf,
+              metadata.requestArgs,
+              Some(metadata), // TODO some logic need to fix since it's not 
from recovery
+              shouldRunAsync = true)
+            val metadataForUpdate = Metadata(
+              identifier = batchId,
+              kyuubiInstance = kyuubiInstance,
+              requestConf = batchSession.optimizedConf,
+              clusterManager = 
batchSession.batchJobSubmissionOp.builder.clusterManager())
+            metadataManager.updateMetadata(metadataForUpdate, 
asyncRetryOnError = false)
+            val sessionHandle = sessionManager.openBatchSession(batchSession)
+            var terminated = false
+            while (!terminated) { // block until batch job finished
+              terminated = sessionManager.getBatchSession(sessionHandle).map { 
batchSession =>
+                val batchOp = batchSession.batchJobSubmissionOp
+                OperationState.isTerminal(batchOp.getStatus.state)
+              }.getOrElse {
+                error(s"Batch Session $batchId is not existed, marked as 
finished")
+                true
+              }
+              // should we always treat metastore as the single of truth?
+              //
+              // terminated = metadataManager.getBatchSessionMetadata(batchId) 
match {
+              //   case Some(metadata) =>
+              //     
OperationState.isTerminal(OperationState.withName(metadata.state))
+              //   case None =>
+              //     error(s"$batchId is not existed in metastore, assume it 
is finished")
+              //     true
+              // }
+              if (!terminated) Thread.sleep(1000)
+            }
+            info(s"$batchId is finished.")
+        }
+      }
+    }
+    (0 until batchExecutor.getCorePoolSize).foreach(_ => 
batchExecutor.submit(submitTask))
+    super.start()
+  }
+
+  override def stop(): Unit = {
+    super.stop()
+    if (running.compareAndSet(true, false)) {
+      ThreadUtils.shutdown(batchExecutor)
+    }
+  }
+}
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala
index 12120ea55..2fc0475a3 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala
@@ -25,7 +25,7 @@ import org.apache.hadoop.security.UserGroupInformation
 
 import org.apache.kyuubi._
 import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.config.KyuubiConf.{FRONTEND_PROTOCOLS, 
FrontendProtocols, KYUUBI_KUBERNETES_CONF_PREFIX}
+import org.apache.kyuubi.config.KyuubiConf.{BATCH_SUBMITTER_ENABLED, 
FRONTEND_PROTOCOLS, FrontendProtocols, KYUUBI_KUBERNETES_CONF_PREFIX}
 import org.apache.kyuubi.config.KyuubiConf.FrontendProtocols._
 import org.apache.kyuubi.events.{EventBus, KyuubiServerInfoEvent, 
ServerEventHandlerRegister}
 import org.apache.kyuubi.ha.HighAvailabilityConf._
@@ -188,6 +188,12 @@ class KyuubiServer(name: String) extends Serverable(name) {
     if (conf.get(MetricsConf.METRICS_ENABLED)) {
       addService(new MetricsSystem)
     }
+
+    if (conf.isRESTEnabled && conf.get(BATCH_SUBMITTER_ENABLED)) {
+      addService(new KyuubiBatchService(
+        this,
+        backendService.sessionManager.asInstanceOf[KyuubiSessionManager]))
+    }
     super.initialize(conf)
 
     initLoggerEventHandler(conf)
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/api.scala 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/api.scala
index b56131542..93953a577 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/api.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/api.scala
@@ -26,7 +26,7 @@ import javax.ws.rs.ext.{ExceptionMapper, Provider}
 import org.eclipse.jetty.server.handler.ContextHandler
 
 import org.apache.kyuubi.Logging
-import org.apache.kyuubi.server.KyuubiRestFrontendService
+import org.apache.kyuubi.server.{KyuubiBatchService, 
KyuubiRestFrontendService, KyuubiServer}
 
 private[api] trait ApiRequestContext {
 
@@ -36,6 +36,11 @@ private[api] trait ApiRequestContext {
   @Context
   protected var httpRequest: HttpServletRequest = _
 
+  protected lazy val batchService: Option[KyuubiBatchService] =
+    KyuubiServer.kyuubiServer.getServices
+      .find(_.isInstanceOf[KyuubiBatchService])
+      .map(_.asInstanceOf[KyuubiBatchService])
+
   final protected def fe: KyuubiRestFrontendService = 
FrontendServiceContext.get(servletContext)
 }
 
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala
index a0e136e5e..9613b3ef1 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala
@@ -30,7 +30,7 @@ import io.swagger.v3.oas.annotations.tags.Tag
 import org.apache.commons.lang3.StringUtils
 
 import org.apache.kyuubi.{KYUUBI_VERSION, Logging, Utils}
-import org.apache.kyuubi.client.api.v1.dto.{Engine, OperationData, ServerData, 
SessionData}
+import org.apache.kyuubi.client.api.v1.dto._
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.config.KyuubiConf._
 import org.apache.kyuubi.ha.HighAvailabilityConf.HA_NAMESPACE
@@ -384,6 +384,31 @@ private[v1] class AdminResource extends ApiRequestContext 
with Logging {
       engine.getSubdomain)
   }
 
+  @ApiResponse(
+    responseCode = "200",
+    content = Array(new Content(
+      mediaType = MediaType.APPLICATION_JSON,
+      schema = new Schema(implementation = classOf[Count]))),
+    description = "get the batch count")
+  @GET
+  @Path("batch/count")
+  def countBatch(
+      @QueryParam("batchType") @DefaultValue("SPARK") batchType: String,
+      @QueryParam("batchUser") batchUser: String,
+      @QueryParam("batchState") batchState: String): Count = {
+    val userName = fe.getSessionUser(Map.empty[String, String])
+    val ipAddress = fe.getIpAddress
+    info(s"Received counting batches request from $userName/$ipAddress")
+    if (!isAdministrator(userName)) {
+      throw new NotAllowedException(
+        s"$userName is not allowed to count the batches")
+    }
+    val batchCount = batchService
+      .map(_.countBatch(batchType, Option(batchUser), Option(batchState)))
+      .getOrElse(0)
+    new Count(batchCount)
+  }
+
   private def isAdministrator(userName: String): Boolean = {
     administrators.contains(userName)
   }
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
index e5ac23905..3cc98374f 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
@@ -58,6 +58,10 @@ private[v1] class BatchesResource extends ApiRequestContext 
with Logging {
   private lazy val internalConnectTimeout =
     fe.getConf.get(BATCH_INTERNAL_REST_CLIENT_CONNECT_TIMEOUT).toInt
 
+  private def batchV2Enabled(reqConf: Map[String, String]): Boolean = {
+    reqConf.getOrElse(BATCH_IMPL_VERSION.key, 
fe.getConf.get(BATCH_IMPL_VERSION)) == "2"
+  }
+
   private def getInternalRestClient(kyuubiInstance: String): 
InternalRestClient = {
     internalRestClients.computeIfAbsent(
       kyuubiInstance,
@@ -233,6 +237,30 @@ private[v1] class BatchesResource extends 
ApiRequestContext with Logging {
             KYUUBI_SESSION_CONNECTION_URL_KEY -> fe.connectionUrl,
             KYUUBI_SESSION_REAL_USER_KEY -> fe.getRealUser())).asJava)
 
+        if (batchV2Enabled(request.getConf.asScala.toMap)) {
+          logger.info(s"Submit batch job $batchId using Batch API v2")
+          return Try {
+            sessionManager.initializeBatchState(
+              userName,
+              ipAddress,
+              request.getConf.asScala.toMap,
+              request)
+          } match {
+            case Success(batchId) =>
+              sessionManager.getBatchFromMetadataStore(batchId) match {
+                case Some(batch) => batch
+                case None => throw new IllegalStateException(
+                    s"can not find batch $batchId from metadata store")
+              }
+            case Failure(cause) if JdbcUtils.isDuplicatedKeyDBErr(cause) =>
+              sessionManager.getBatchFromMetadataStore(batchId) match {
+                case Some(batch) => markDuplicated(batch)
+                case None => throw new IllegalStateException(
+                    s"can not find duplicated batch $batchId from metadata 
store")
+              }
+          }
+        }
+
         Try {
           sessionManager.openBatchSession(
             userName,
@@ -278,7 +306,8 @@ private[v1] class BatchesResource extends ApiRequestContext 
with Logging {
       buildBatch(batchSession)
     }.getOrElse {
       sessionManager.getBatchMetadata(batchId).map { metadata =>
-        if (OperationState.isTerminal(OperationState.withName(metadata.state)) 
||
+        if (batchV2Enabled(metadata.requestConf) ||
+          OperationState.isTerminal(OperationState.withName(metadata.state)) ||
           metadata.kyuubiInstance == fe.connectionUrl) {
           MetadataManager.buildBatch(metadata)
         } else {
@@ -376,7 +405,11 @@ private[v1] class BatchesResource extends 
ApiRequestContext with Logging {
       }
     }.getOrElse {
       sessionManager.getBatchMetadata(batchId).map { metadata =>
-        if (fe.connectionUrl != metadata.kyuubiInstance) {
+        if (batchV2Enabled(metadata.requestConf) && metadata.state == 
"INITIALIZED") {
+          info(s"Batch $batchId is waiting for scheduling")
+          val dummyLogs = List(s"Batch $batchId is waiting for 
scheduling").asJava
+          new OperationLog(dummyLogs, dummyLogs.size)
+        } else if (fe.connectionUrl != metadata.kyuubiInstance) {
           val internalRestClient = 
getInternalRestClient(metadata.kyuubiInstance)
           internalRestClient.getBatchLocalLog(userName, batchId, from, size)
         } else {
@@ -430,6 +463,16 @@ private[v1] class BatchesResource extends 
ApiRequestContext with Logging {
         checkPermission(userName, metadata.username)
         if 
(OperationState.isTerminal(OperationState.withName(metadata.state))) {
           new CloseBatchResponse(false, s"The batch[$metadata] has been 
terminated.")
+        } else if (batchV2Enabled(metadata.requestConf) && metadata.state == 
"INITIALIZED") {
+          if (batchService.get.cancelUnscheduledBatch(batchId)) {
+            new CloseBatchResponse(true, s"Unscheduled batch $batchId is 
canceled.")
+          } else if 
(OperationState.isTerminal(OperationState.withName(metadata.state))) {
+            new CloseBatchResponse(false, s"The batch[$metadata] has been 
terminated.")
+          } else {
+            info(s"Cancel batch[$batchId] with state ${metadata.state} by 
killing application")
+            val (killed, msg) = forceKill(metadata.appMgrInfo, batchId)
+            new CloseBatchResponse(killed, msg)
+          }
         } else if (metadata.kyuubiInstance != fe.connectionUrl) {
           info(s"Redirecting delete batch[$batchId] to 
${metadata.kyuubiInstance}")
           val internalRestClient = 
getInternalRestClient(metadata.kyuubiInstance)
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataManager.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataManager.scala
index e2648076d..daa3451d9 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataManager.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataManager.scala
@@ -138,6 +138,27 @@ class MetadataManager extends 
AbstractService("MetadataManager") {
       buildBatch)
   }
 
+  def countBatch(
+      batchType: String,
+      batchUser: String,
+      batchState: String,
+      kyuubiInstance: String): Int = {
+    val filter = MetadataFilter(
+      sessionType = SessionType.BATCH,
+      engineType = batchType,
+      username = batchUser,
+      state = batchState,
+      kyuubiInstance = kyuubiInstance)
+    withMetadataRequestMetrics(_metadataStore.countMetadata(filter))
+  }
+
+  def pickBatchForSubmitting(kyuubiInstance: String): Option[Metadata] =
+    withMetadataRequestMetrics(_metadataStore.pickMetadata(kyuubiInstance))
+
+  def cancelUnscheduledBatch(batchId: String): Boolean = {
+    _metadataStore.transformMetadataState(batchId, "INITIALIZED", "CANCELED")
+  }
+
   def getBatchesRecoveryMetadata(
       state: String,
       kyuubiInstance: String,
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataStore.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataStore.scala
index 4416c4a6d..1eabb224d 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataStore.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataStore.scala
@@ -28,6 +28,23 @@ trait MetadataStore extends Closeable {
    */
   def insertMetadata(metadata: Metadata): Unit
 
+  /**
+   * Find unscheduled batch job metadata and pick up it to submit.
+   * @param kyuubiInstance the Kyuubi instance picked batch job
+   * @return selected metadata for submitting or None if no sufficient items
+   */
+  def pickMetadata(kyuubiInstance: String): Option[Metadata]
+
+  /**
+   * Transfer state of metadata from the existing state to another
+   * @param identifier the identifier.
+   * @param fromState the desired current state
+   * @param targetState the desired target state
+   * @return `true` if the metadata state was same as `fromState`, and 
successfully
+   *         transitioned to `targetState`, otherwise `false` is returned
+   */
+  def transformMetadataState(identifier: String, fromState: String, 
targetState: String): Boolean
+
   /**
    * Get the persisted metadata by batch identifier.
    * @param identifier the identifier.
@@ -50,6 +67,13 @@ trait MetadataStore extends Closeable {
       size: Int,
       stateOnly: Boolean): Seq[Metadata]
 
+  /**
+   * Count the metadata list with filter conditions.
+   * @param filter the metadata filter conditions.
+   * @return the count of metadata satisfied the filter condition.
+   */
+  def countMetadata(filter: MetadataFilter): Int
+
   /**
    * Update the metadata according to identifier.
    * Note that, it will only update the state and engine related metadata.
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala
index 798e52d39..0255aadfd 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala
@@ -194,6 +194,43 @@ class JDBCMetadataStore(conf: KyuubiConf) extends 
MetadataStore with Logging {
     }
   }
 
+  override def pickMetadata(kyuubiInstance: String): Option[Metadata] = 
synchronized {
+    JdbcUtils.executeQueryWithRowMapper(
+      s"""SELECT identifier FROM $METADATA_TABLE
+         |WHERE state=?
+         |ORDER BY create_time DESC LIMIT 1
+         |""".stripMargin) { stmt =>
+      stmt.setString(1, OperationState.INITIALIZED.toString)
+    } { resultSet =>
+      resultSet.getString(1)
+    }.headOption.filter { preSelectedBatchId =>
+      JdbcUtils.executeUpdate(
+        s"""UPDATE $METADATA_TABLE
+           |SET kyuubi_instance=?, state=?
+           |WHERE identifier=? AND state=?
+           |""".stripMargin) { stmt =>
+        stmt.setString(1, kyuubiInstance)
+        stmt.setString(2, OperationState.PENDING.toString)
+        stmt.setString(3, preSelectedBatchId)
+        stmt.setString(4, OperationState.INITIALIZED.toString)
+      } == 1
+    }.map { pickedBatchId =>
+      getMetadata(pickedBatchId, stateOnly = false)
+    }
+  }
+
+  override def transformMetadataState(
+      identifier: String,
+      fromState: String,
+      targetState: String): Boolean = {
+    val query = s"UPDATE $METADATA_TABLE SET state = ? WHERE identifier = ? 
AND state = ?"
+    JdbcUtils.withConnection { connection =>
+      withUpdateCount(connection, query, fromState, identifier, targetState) { 
updateCount =>
+        updateCount == 1
+      }
+    }
+  }
+
   override def getMetadata(identifier: String, stateOnly: Boolean): Metadata = 
{
     val query =
       if (stateOnly) {
@@ -230,6 +267,19 @@ class JDBCMetadataStore(conf: KyuubiConf) extends 
MetadataStore with Logging {
     }
   }
 
+  override def countMetadata(filter: MetadataFilter): Int = {
+    val queryBuilder = new StringBuilder
+    val params = ListBuffer[Any]()
+    queryBuilder.append(s"SELECT COUNT(1) FROM $METADATA_TABLE")
+    queryBuilder.append(s" ${assembleWhereClause(filter, params)}")
+    val query = queryBuilder.toString
+    JdbcUtils.executeQueryWithRowMapper(query) { stmt =>
+      setStatementParams(stmt, params)
+    } { resultSet =>
+      resultSet.getInt(1)
+    }.head
+  }
+
   private def assembleWhereClause(
       filter: MetadataFilter,
       params: ListBuffer[Any]): String = {
@@ -280,10 +330,22 @@ class JDBCMetadataStore(conf: KyuubiConf) extends 
MetadataStore with Logging {
 
     queryBuilder.append(s"UPDATE $METADATA_TABLE")
     val setClauses = ListBuffer[String]()
+    Option(metadata.kyuubiInstance).foreach { _ =>
+      setClauses += "kyuubi_instance = ?"
+      params += metadata.kyuubiInstance
+    }
     Option(metadata.state).foreach { _ =>
       setClauses += "state = ?"
       params += metadata.state
     }
+    Option(metadata.requestConf).filter(_.nonEmpty).foreach { _ =>
+      setClauses += "request_conf =?"
+      params += valueAsString(metadata.requestConf)
+    }
+    metadata.clusterManager.foreach { cm =>
+      setClauses += "cluster_manager = ?"
+      params += cm
+    }
     if (metadata.endTime > 0) {
       setClauses += "end_time = ?"
       params += metadata.endTime
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
index 193bfbdfc..5749f7412 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
@@ -27,8 +27,10 @@ import org.apache.hive.service.rpc.thrift.TProtocolVersion
 
 import org.apache.kyuubi.KyuubiSQLException
 import org.apache.kyuubi.client.api.v1.dto.{Batch, BatchRequest}
+import org.apache.kyuubi.client.util.BatchUtils.KYUUBI_BATCH_ID_KEY
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.config.KyuubiConf._
+import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_REAL_USER_KEY
 import org.apache.kyuubi.credentials.HadoopCredentialsManager
 import org.apache.kyuubi.engine.KyuubiApplicationManager
 import org.apache.kyuubi.metrics.MetricsConstants._
@@ -219,6 +221,34 @@ class KyuubiSessionManager private (name: String) extends 
SessionManager(name) {
     openBatchSession(batchSession)
   }
 
+  def initializeBatchState(
+      user: String,
+      ipAddress: String,
+      conf: Map[String, String],
+      batchRequest: BatchRequest): String = {
+    val realUser = conf.getOrElse(KYUUBI_SESSION_REAL_USER_KEY, user)
+    val username = Option(user).filter(_.nonEmpty).getOrElse("anonymous")
+    val batchId = conf(KYUUBI_BATCH_ID_KEY)
+    val metadata = Metadata(
+      identifier = batchId,
+      sessionType = SessionType.BATCH,
+      realUser = realUser,
+      username = username,
+      ipAddress = ipAddress,
+      state = OperationState.INITIALIZED.toString,
+      resource = batchRequest.getResource,
+      className = batchRequest.getClassName,
+      requestName = batchRequest.getName,
+      requestConf = conf,
+      requestArgs = batchRequest.getArgs.asScala,
+      createTime = System.currentTimeMillis(),
+      engineType = batchRequest.getBatchType)
+
+    // there is a chance that operation failed w/ duplicated key error
+    metadataManager.foreach(_.insertMetadata(metadata, asyncRetryOnError = 
false))
+    batchId
+  }
+
   def getBatchSession(sessionHandle: SessionHandle): 
Option[KyuubiBatchSession] = {
     getSessionOption(sessionHandle).map(_.asInstanceOf[KyuubiBatchSession])
   }
diff --git a/kyuubi-server/src/test/resources/log4j2-test.xml 
b/kyuubi-server/src/test/resources/log4j2-test.xml
index 623dd71fd..25e37e859 100644
--- a/kyuubi-server/src/test/resources/log4j2-test.xml
+++ b/kyuubi-server/src/test/resources/log4j2-test.xml
@@ -48,5 +48,9 @@
         <Logger 
name="org.apache.kyuubi.server.http.authentication.AuthenticationAuditLogger" 
additivity="false">
             <AppenderRef ref="restAudit" />
         </Logger>
+        <Logger name="org.apache.kyuubi.server.metadata.jdbc" level="DEBUG" 
additivity="false">
+            <AppenderRef ref="stdout" />
+            <AppenderRef ref="file"/>
+        </Logger>
     </Loggers>
 </Configuration>
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
index dd62ee48d..ef581f414 100644
--- 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
+++ 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
@@ -48,14 +48,40 @@ import org.apache.kyuubi.server.metadata.api.{Metadata, 
MetadataFilter}
 import org.apache.kyuubi.service.authentication.{InternalSecurityAccessor, 
KyuubiAuthenticationFactory}
 import org.apache.kyuubi.session.{KyuubiBatchSession, KyuubiSessionManager, 
SessionHandle, SessionType}
 
-class BatchesResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper 
with BatchTestHelper {
-  override protected lazy val conf: KyuubiConf = KyuubiConf()
-    .set(KyuubiConf.ENGINE_SECURITY_ENABLED, true)
-    .set(KyuubiConf.ENGINE_SECURITY_SECRET_PROVIDER, "simple")
-    .set(KyuubiConf.SIMPLE_SECURITY_SECRET_PROVIDER_PROVIDER_SECRET, 
"ENGINE____SECRET")
-    .set(
-      KyuubiConf.SESSION_LOCAL_DIR_ALLOW_LIST,
-      Seq(Paths.get(sparkBatchTestResource.get).getParent.toString))
+class BatchesV1ResourceSuite extends BatchesResourceSuiteBase {
+  override def batchVersion: String = "1"
+
+  override def customConf: Map[String, String] = Map.empty
+}
+
+class BatchesV2ResourceSuite extends BatchesResourceSuiteBase {
+  override def batchVersion: String = "2"
+
+  override def customConf: Map[String, String] = Map(
+    KyuubiConf.METADATA_REQUEST_ASYNC_RETRY_ENABLED.key -> "false",
+    KyuubiConf.BATCH_SUBMITTER_ENABLED.key -> "true")
+}
+
+abstract class BatchesResourceSuiteBase extends KyuubiFunSuite
+  with RestFrontendTestHelper
+  with BatchTestHelper {
+
+  def batchVersion: String
+
+  def customConf: Map[String, String]
+
+  override protected lazy val conf: KyuubiConf = {
+    val kyuubiConf = KyuubiConf()
+      .set(KyuubiConf.ENGINE_SECURITY_ENABLED, true)
+      .set(KyuubiConf.ENGINE_SECURITY_SECRET_PROVIDER, "simple")
+      .set(KyuubiConf.SIMPLE_SECURITY_SECRET_PROVIDER_PROVIDER_SECRET, 
"ENGINE____SECRET")
+      .set(KyuubiConf.BATCH_IMPL_VERSION, batchVersion)
+      .set(
+        KyuubiConf.SESSION_LOCAL_DIR_ALLOW_LIST,
+        Seq(Paths.get(sparkBatchTestResource.get).getParent.toString))
+    customConf.foreach { case (k, v) => kyuubiConf.set(k, v) }
+    kyuubiConf
+  }
 
   override def beforeAll(): Unit = {
     super.beforeAll()
@@ -79,9 +105,18 @@ class BatchesResourceSuite extends KyuubiFunSuite with 
RestFrontendTestHelper wi
     val response = webTarget.path("api/v1/batches")
       .request(MediaType.APPLICATION_JSON_TYPE)
       .post(Entity.entity(requestObj, MediaType.APPLICATION_JSON_TYPE))
-    assert(200 == response.getStatus)
+    assert(response.getStatus === 200)
     var batch = response.readEntity(classOf[Batch])
-    assert(batch.getKyuubiInstance === fe.connectionUrl)
+    batchVersion match {
+      case "1" =>
+        assert(batch.getKyuubiInstance === fe.connectionUrl)
+      case "2" if batch.getState === "INITIALIZED" =>
+        assert(batch.getKyuubiInstance === null)
+      case "2" if batch.getState === "PENDING" => // batch picked by 
BatchService
+        assert(batch.getKyuubiInstance === fe.connectionUrl)
+      case _ =>
+        fail(s"unexpected batch info, version: $batchVersion state: 
${batch.getState}")
+    }
     assert(batch.getBatchType === "SPARK")
     assert(batch.getName === sparkBatchTestAppName)
     assert(batch.getCreateTime > 0)
@@ -93,16 +128,25 @@ class BatchesResourceSuite extends KyuubiFunSuite with 
RestFrontendTestHelper wi
     val proxyUserResponse = webTarget.path("api/v1/batches")
       .request(MediaType.APPLICATION_JSON_TYPE)
       .post(Entity.entity(proxyUserRequest, MediaType.APPLICATION_JSON_TYPE))
-    assert(405 == proxyUserResponse.getStatus)
+    assert(proxyUserResponse.getStatus === 405)
     var errorMessage = "Failed to validate proxy privilege of anonymous for 
root"
     
assert(proxyUserResponse.readEntity(classOf[String]).contains(errorMessage))
 
-    var getBatchResponse = webTarget.path(s"api/v1/batches/${batch.getId()}")
+    var getBatchResponse = webTarget.path(s"api/v1/batches/${batch.getId}")
       .request(MediaType.APPLICATION_JSON_TYPE)
       .get()
-    assert(200 == getBatchResponse.getStatus)
+    assert(getBatchResponse.getStatus === 200)
     batch = getBatchResponse.readEntity(classOf[Batch])
-    assert(batch.getKyuubiInstance === fe.connectionUrl)
+    batchVersion match {
+      case "1" =>
+        assert(batch.getKyuubiInstance === fe.connectionUrl)
+      case "2" if batch.getState === "INITIALIZED" =>
+        assert(batch.getKyuubiInstance === null)
+      case "2" if batch.getState === "PENDING" => // batch picked by 
BatchService
+        assert(batch.getKyuubiInstance === fe.connectionUrl)
+      case _ =>
+        fail(s"unexpected batch info, version: $batchVersion state: 
${batch.getState}")
+    }
     assert(batch.getBatchType === "SPARK")
     assert(batch.getName === sparkBatchTestAppName)
     assert(batch.getCreateTime > 0)
@@ -115,26 +159,26 @@ class BatchesResourceSuite extends KyuubiFunSuite with 
RestFrontendTestHelper wi
     getBatchResponse = webTarget.path(s"api/v1/batches/invalidBatchId")
       .request(MediaType.APPLICATION_JSON_TYPE)
       .get()
-    assert(404 == getBatchResponse.getStatus)
+    assert(getBatchResponse.getStatus === 404)
 
     // get batch log
     var logResponse: Response = null
     var log: OperationLog = null
     eventually(timeout(10.seconds), interval(1.seconds)) {
-      logResponse = webTarget.path(s"api/v1/batches/${batch.getId()}/localLog")
+      logResponse = webTarget.path(s"api/v1/batches/${batch.getId}/localLog")
         .queryParam("from", "0")
         .queryParam("size", "1")
         .request(MediaType.APPLICATION_JSON_TYPE)
         .get()
       log = logResponse.readEntity(classOf[OperationLog])
-      assert(log.getRowCount == 1)
+      assert(log.getRowCount === 1)
     }
     val head = log.getLogRowSet.asScala.head
 
     val logs = new ArrayBuffer[String]
     logs.append(head)
     eventually(timeout(10.seconds), interval(1.seconds)) {
-      logResponse = webTarget.path(s"api/v1/batches/${batch.getId()}/localLog")
+      logResponse = webTarget.path(s"api/v1/batches/${batch.getId}/localLog")
         .queryParam("from", "-1")
         .queryParam("size", "100")
         .request(MediaType.APPLICATION_JSON_TYPE)
@@ -146,67 +190,67 @@ class BatchesResourceSuite extends KyuubiFunSuite with 
RestFrontendTestHelper wi
 
       // check both kyuubi log and engine log
       assert(
-        logs.exists(_.contains("/bin/spark-submit")) && logs.exists(
-          _.contains(s"SparkContext: Submitted application: 
$sparkBatchTestAppName")))
+        logs.exists(_.contains("/bin/spark-submit")) &&
+          logs.exists(_.contains(s"SparkContext: Submitted application: 
$sparkBatchTestAppName")))
     }
 
     // invalid user name
     val encodeAuthorization =
-      new String(Base64.getEncoder.encode(batch.getId().getBytes()), "UTF-8")
-    var deleteBatchResponse = 
webTarget.path(s"api/v1/batches/${batch.getId()}")
+      new String(Base64.getEncoder.encode(batch.getId.getBytes()), "UTF-8")
+    var deleteBatchResponse = webTarget.path(s"api/v1/batches/${batch.getId}")
       .request(MediaType.APPLICATION_JSON_TYPE)
       .header(AUTHORIZATION_HEADER, s"BASIC $encodeAuthorization")
       .delete()
-    assert(405 == deleteBatchResponse.getStatus)
-    errorMessage = s"${batch.getId()} is not allowed to close the session 
belong to anonymous"
+    assert(deleteBatchResponse.getStatus === 405)
+    errorMessage = s"${batch.getId} is not allowed to close the session belong 
to anonymous"
     
assert(deleteBatchResponse.readEntity(classOf[String]).contains(errorMessage))
 
     // invalid batchId
     deleteBatchResponse = webTarget.path(s"api/v1/batches/notValidUUID")
       .request(MediaType.APPLICATION_JSON_TYPE)
       .delete()
-    assert(404 == deleteBatchResponse.getStatus)
+    assert(deleteBatchResponse.getStatus === 404)
 
     // non-existed batch session
     deleteBatchResponse = 
webTarget.path(s"api/v1/batches/${UUID.randomUUID().toString}")
       .request(MediaType.APPLICATION_JSON_TYPE)
       .delete()
-    assert(404 == deleteBatchResponse.getStatus)
+    assert(deleteBatchResponse.getStatus === 404)
 
     // invalid proxy user
-    deleteBatchResponse = webTarget.path(s"api/v1/batches/${batch.getId()}")
+    deleteBatchResponse = webTarget.path(s"api/v1/batches/${batch.getId}")
       .queryParam("hive.server2.proxy.user", "invalidProxy")
       .request(MediaType.APPLICATION_JSON_TYPE)
       .delete()
-    assert(405 == deleteBatchResponse.getStatus)
+    assert(deleteBatchResponse.getStatus === 405)
     errorMessage = "Failed to validate proxy privilege of anonymous for 
invalidProxy"
     
assert(deleteBatchResponse.readEntity(classOf[String]).contains(errorMessage))
 
     // check close batch session
-    deleteBatchResponse = webTarget.path(s"api/v1/batches/${batch.getId()}")
+    deleteBatchResponse = webTarget.path(s"api/v1/batches/${batch.getId}")
       .request(MediaType.APPLICATION_JSON_TYPE)
       .delete()
-    assert(200 == deleteBatchResponse.getStatus)
+    assert(deleteBatchResponse.getStatus === 200)
     val closeBatchResponse = 
deleteBatchResponse.readEntity(classOf[CloseBatchResponse])
 
     // check state after close batch session
-    getBatchResponse = webTarget.path(s"api/v1/batches/${batch.getId()}")
+    getBatchResponse = webTarget.path(s"api/v1/batches/${batch.getId}")
       .request(MediaType.APPLICATION_JSON_TYPE)
       .get()
-    assert(200 == getBatchResponse.getStatus)
+    assert(getBatchResponse.getStatus === 200)
     batch = getBatchResponse.readEntity(classOf[Batch])
-    assert(batch.getId == batch.getId())
+    assert(batch.getId === batch.getId)
     if (closeBatchResponse.isSuccess) {
-      assert(batch.getState == "CANCELED")
+      assert(batch.getState === "CANCELED")
     } else {
       assert(batch.getState != "CANCELED")
     }
 
     // close the closed batch session
-    deleteBatchResponse = webTarget.path(s"api/v1/batches/${batch.getId()}")
+    deleteBatchResponse = webTarget.path(s"api/v1/batches/${batch.getId}")
       .request(MediaType.APPLICATION_JSON_TYPE)
       .delete()
-    assert(200 == deleteBatchResponse.getStatus)
+    assert(deleteBatchResponse.getStatus === 200)
     
assert(!deleteBatchResponse.readEntity(classOf[CloseBatchResponse]).isSuccess)
   }
 
@@ -221,17 +265,36 @@ class BatchesResourceSuite extends KyuubiFunSuite with 
RestFrontendTestHelper wi
     val response = webTarget.path("api/v1/batches")
       .request(MediaType.APPLICATION_JSON)
       .post(Entity.entity(multipart, MediaType.MULTIPART_FORM_DATA))
-    assert(200 == response.getStatus)
+    assert(response.getStatus === 200)
     val batch = response.readEntity(classOf[Batch])
-    assert(batch.getKyuubiInstance === fe.connectionUrl)
+    batchVersion match {
+      case "1" =>
+        assert(batch.getKyuubiInstance === fe.connectionUrl)
+      case "2" if batch.getState === "INITIALIZED" =>
+        assert(batch.getKyuubiInstance === null)
+      case "2" if batch.getState === "PENDING" => // batch picked by 
BatchService
+        assert(batch.getKyuubiInstance === fe.connectionUrl)
+      case _ =>
+        fail(s"unexpected batch info, version: $batchVersion state: 
${batch.getState}")
+    }
     assert(batch.getBatchType === "SPARK")
     assert(batch.getName === sparkBatchTestAppName)
     assert(batch.getCreateTime > 0)
     assert(batch.getEndTime === 0)
 
-    webTarget.path(s"api/v1/batches/${batch.getId()}").request(
-      MediaType.APPLICATION_JSON_TYPE).delete()
-    eventually(timeout(3.seconds)) {
+    // wait for batch be scheduled
+    eventually(timeout(5.seconds), interval(200.millis)) {
+      val resp = webTarget.path(s"api/v1/batches/${batch.getId}")
+        .request(MediaType.APPLICATION_JSON_TYPE)
+        .get()
+      val batchState = resp.readEntity(classOf[Batch]).getState
+      assert(batchState === "PENDING" || batchState === "RUNNING")
+    }
+
+    webTarget.path(s"api/v1/batches/${batch.getId}")
+      .request(MediaType.APPLICATION_JSON_TYPE)
+      .delete()
+    eventually(timeout(5.seconds), interval(200.millis)) {
       assert(KyuubiApplicationManager.uploadWorkDir.toFile.listFiles().isEmpty)
     }
   }
@@ -245,14 +308,14 @@ class BatchesResourceSuite extends KyuubiFunSuite with 
RestFrontendTestHelper wi
     val resp1 = webTarget.path("api/v1/batches")
       .request(MediaType.APPLICATION_JSON_TYPE)
       .post(Entity.entity(reqObj, MediaType.APPLICATION_JSON_TYPE))
-    assert(200 == resp1.getStatus)
+    assert(resp1.getStatus === 200)
     val batch1 = resp1.readEntity(classOf[Batch])
     assert(batch1.getId === batchId)
 
     val resp2 = webTarget.path("api/v1/batches")
       .request(MediaType.APPLICATION_JSON_TYPE)
       .post(Entity.entity(reqObj, MediaType.APPLICATION_JSON_TYPE))
-    assert(200 == resp2.getStatus)
+    assert(resp2.getStatus === 200)
     val batch2 = resp2.readEntity(classOf[Batch])
     assert(batch2.getId === batchId)
 
@@ -276,9 +339,9 @@ class BatchesResourceSuite extends KyuubiFunSuite with 
RestFrontendTestHelper wi
       .request(MediaType.APPLICATION_JSON_TYPE)
       .get()
 
-    assert(response.getStatus == 200)
+    assert(response.getStatus === 200)
     val getBatchListResponse = response.readEntity(classOf[GetBatchesResponse])
-    assert(getBatchListResponse.getBatches.isEmpty && 
getBatchListResponse.getTotal == 0)
+    assert(getBatchListResponse.getBatches.isEmpty && 
getBatchListResponse.getTotal === 0)
 
     sessionManager.openBatchSession(
       "kyuubi",
@@ -330,10 +393,10 @@ class BatchesResourceSuite extends KyuubiFunSuite with 
RestFrontendTestHelper wi
       .request(MediaType.APPLICATION_JSON_TYPE)
       .get()
 
-    assert(response2.getStatus == 200)
+    assert(response2.getStatus === 200)
 
     val getBatchListResponse2 = 
response2.readEntity(classOf[GetBatchesResponse])
-    assert(getBatchListResponse2.getTotal == 2)
+    assert(getBatchListResponse2.getTotal === 2)
 
     val response3 = webTarget.path("api/v1/batches")
       .queryParam("batchType", "spark")
@@ -342,10 +405,10 @@ class BatchesResourceSuite extends KyuubiFunSuite with 
RestFrontendTestHelper wi
       .request(MediaType.APPLICATION_JSON_TYPE)
       .get()
 
-    assert(response3.getStatus == 200)
+    assert(response3.getStatus === 200)
 
     val getBatchListResponse3 = 
response3.readEntity(classOf[GetBatchesResponse])
-    assert(getBatchListResponse3.getTotal == 1)
+    assert(getBatchListResponse3.getTotal === 1)
 
     val response4 = webTarget.path("api/v1/batches")
       .queryParam("batchType", "spark")
@@ -354,9 +417,9 @@ class BatchesResourceSuite extends KyuubiFunSuite with 
RestFrontendTestHelper wi
       .request(MediaType.APPLICATION_JSON_TYPE)
       .get()
 
-    assert(response4.getStatus == 200)
+    assert(response4.getStatus === 200)
     val getBatchListResponse4 = 
response4.readEntity(classOf[GetBatchesResponse])
-    assert(getBatchListResponse4.getBatches.isEmpty && 
getBatchListResponse4.getTotal == 0)
+    assert(getBatchListResponse4.getBatches.isEmpty && 
getBatchListResponse4.getTotal === 0)
 
     val response5 = webTarget.path("api/v1/batches")
       .queryParam("batchType", "mock")
@@ -365,10 +428,10 @@ class BatchesResourceSuite extends KyuubiFunSuite with 
RestFrontendTestHelper wi
       .request(MediaType.APPLICATION_JSON_TYPE)
       .get()
 
-    assert(response5.getStatus == 200)
+    assert(response5.getStatus === 200)
 
     val getBatchListResponse5 = 
response5.readEntity(classOf[GetBatchesResponse])
-    assert(getBatchListResponse5.getTotal == 0)
+    assert(getBatchListResponse5.getTotal === 0)
 
     // TODO add more test when add more batchType
     val response6 = webTarget.path("api/v1/batches")
@@ -377,9 +440,9 @@ class BatchesResourceSuite extends KyuubiFunSuite with 
RestFrontendTestHelper wi
       .request(MediaType.APPLICATION_JSON_TYPE)
       .get()
 
-    assert(response6.getStatus == 200)
+    assert(response6.getStatus === 200)
     val getBatchListResponse6 = 
response6.readEntity(classOf[GetBatchesResponse])
-    assert(getBatchListResponse6.getTotal == 1)
+    assert(getBatchListResponse6.getTotal === 1)
     sessionManager.allSessions().foreach(_.close())
 
     val queryCreateTime = System.currentTimeMillis()
@@ -420,7 +483,7 @@ class BatchesResourceSuite extends KyuubiFunSuite with 
RestFrontendTestHelper wi
       val response = webTarget.path("api/v1/batches")
         .request(MediaType.APPLICATION_JSON_TYPE)
         .post(Entity.entity(req, MediaType.APPLICATION_JSON_TYPE))
-      assert(500 == response.getStatus)
+      assert(response.getStatus === 500)
       assert(response.readEntity(classOf[String]).contains(msg))
     }
 
@@ -433,7 +496,7 @@ class BatchesResourceSuite extends KyuubiFunSuite with 
RestFrontendTestHelper wi
       val response = webTarget.path(s"api/v1/batches/$batchId")
         .request(MediaType.APPLICATION_JSON_TYPE)
         .get
-      assert(404 == response.getStatus)
+      assert(response.getStatus === 404)
       assert(response.readEntity(classOf[String]).contains(msg))
     }
   }
@@ -442,7 +505,7 @@ class BatchesResourceSuite extends KyuubiFunSuite with 
RestFrontendTestHelper wi
     val sessionManager = 
fe.be.sessionManager.asInstanceOf[KyuubiSessionManager]
     val kyuubiInstance = fe.connectionUrl
 
-    assert(sessionManager.getOpenSessionCount == 0)
+    assert(sessionManager.getOpenSessionCount === 0)
     val batchId1 = UUID.randomUUID().toString
     val batchId2 = UUID.randomUUID().toString
 
@@ -502,7 +565,7 @@ class BatchesResourceSuite extends KyuubiFunSuite with 
RestFrontendTestHelper wi
 
     val restFe = fe.asInstanceOf[KyuubiRestFrontendService]
     restFe.recoverBatchSessions()
-    assert(sessionManager.getOpenSessionCount == 2)
+    assert(sessionManager.getOpenSessionCount === 2)
 
     val sessionHandle1 = SessionHandle.fromUUID(batchId1)
     val sessionHandle2 = SessionHandle.fromUUID(batchId2)
@@ -524,7 +587,7 @@ class BatchesResourceSuite extends KyuubiFunSuite with 
RestFrontendTestHelper wi
     assert(sessionManager.getBatchesFromMetadataStore(
       MetadataFilter(engineType = "SPARK"),
       0,
-      Int.MaxValue).size == 2)
+      Int.MaxValue).size === 2)
   }
 
   test("get local log internal redirection") {
@@ -549,7 +612,7 @@ class BatchesResourceSuite extends KyuubiFunSuite with 
RestFrontendTestHelper wi
       .queryParam("size", "1")
       .request(MediaType.APPLICATION_JSON_TYPE)
       .get()
-    assert(logResponse.getStatus == 404)
+    assert(logResponse.getStatus === 404)
     assert(logResponse.readEntity(classOf[String]).contains("No local log 
found"))
 
     // get local batch log that is not existing
@@ -558,7 +621,7 @@ class BatchesResourceSuite extends KyuubiFunSuite with 
RestFrontendTestHelper wi
       .queryParam("size", "1")
       .request(MediaType.APPLICATION_JSON_TYPE)
       .get()
-    assert(logResponse.getStatus == 404)
+    assert(logResponse.getStatus === 404)
     assert(logResponse.readEntity(classOf[String]).contains("Invalid batchId"))
 
     val metadata2 = metadata.copy(
@@ -572,7 +635,7 @@ class BatchesResourceSuite extends KyuubiFunSuite with 
RestFrontendTestHelper wi
       .queryParam("size", "1")
       .request(MediaType.APPLICATION_JSON_TYPE)
       .get()
-    assert(logResponse.getStatus == 500)
+    assert(logResponse.getStatus === 500)
     assert(logResponse.readEntity(classOf[String]).contains(
       s"Api request failed for http://${metadata2.kyuubiInstance}";))
   }
@@ -601,7 +664,7 @@ class BatchesResourceSuite extends KyuubiFunSuite with 
RestFrontendTestHelper wi
       .request(MediaType.APPLICATION_JSON_TYPE)
       .header(AUTHORIZATION_HEADER, s"BASIC $encodeAuthorization")
       .delete()
-    assert(deleteResp.getStatus == 200)
+    assert(deleteResp.getStatus === 200)
     assert(!deleteResp.readEntity(classOf[CloseBatchResponse]).isSuccess)
 
     // delete batch that is not existing
@@ -609,7 +672,7 @@ class BatchesResourceSuite extends KyuubiFunSuite with 
RestFrontendTestHelper wi
       .request(MediaType.APPLICATION_JSON_TYPE)
       .header(AUTHORIZATION_HEADER, s"BASIC $encodeAuthorization")
       .delete()
-    assert(deleteResp.getStatus == 404)
+    assert(deleteResp.getStatus === 404)
     assert(deleteResp.readEntity(classOf[String]).contains("Invalid batchId:"))
 
     val metadata2 = metadata.copy(
@@ -622,7 +685,7 @@ class BatchesResourceSuite extends KyuubiFunSuite with 
RestFrontendTestHelper wi
       .request(MediaType.APPLICATION_JSON_TYPE)
       .header(AUTHORIZATION_HEADER, s"BASIC $encodeAuthorization")
       .delete()
-    assert(deleteResp.getStatus == 200)
+    assert(deleteResp.getStatus === 200)
     assert(deleteResp.readEntity(classOf[CloseBatchResponse]).getMsg.contains(
       s"Api request failed for http://${metadata2.kyuubiInstance}";))
   }
@@ -637,10 +700,12 @@ class BatchesResourceSuite extends KyuubiFunSuite with 
RestFrontendTestHelper wi
       .request(MediaType.APPLICATION_JSON_TYPE)
       .header(conf.get(FRONTEND_PROXY_HTTP_CLIENT_IP_HEADER), realClientIp)
       .post(Entity.entity(requestObj, MediaType.APPLICATION_JSON_TYPE))
-    assert(200 == response.getStatus)
+    assert(response.getStatus === 200)
     val batch = response.readEntity(classOf[Batch])
-    val batchSession = 
sessionManager.getBatchSession(SessionHandle.fromUUID(batch.getId))
-    assert(batchSession.map(_.ipAddress).contains(realClientIp))
+    eventually(timeout(10.seconds)) {
+      val batchSession = 
sessionManager.getBatchSession(SessionHandle.fromUUID(batch.getId))
+      assert(batchSession.map(_.ipAddress).contains(realClientIp))
+    }
   }
 
   test("expose the metrics with operation type and current state") {
@@ -650,42 +715,47 @@ class BatchesResourceSuite extends KyuubiFunSuite with 
RestFrontendTestHelper wi
       assert(getBatchJobSubmissionStateCounter(OperationState.RUNNING) === 0)
     }
 
-    val originalTerminateCounter = 
getBatchJobSubmissionStateCounter(OperationState.CANCELED) +
-      getBatchJobSubmissionStateCounter(OperationState.FINISHED) +
-      getBatchJobSubmissionStateCounter(OperationState.ERROR)
-
-    val requestObj = newSparkBatchRequest(Map("spark.master" -> "local"))
-
-    val response = webTarget.path("api/v1/batches")
-      .request(MediaType.APPLICATION_JSON_TYPE)
-      .post(Entity.entity(requestObj, MediaType.APPLICATION_JSON_TYPE))
-    assert(200 == response.getStatus)
-    var batch = response.readEntity(classOf[Batch])
+    val originalTerminatedCount =
+      getBatchJobSubmissionStateCounter(OperationState.CANCELED) +
+        getBatchJobSubmissionStateCounter(OperationState.FINISHED) +
+        getBatchJobSubmissionStateCounter(OperationState.ERROR)
 
-    assert(getBatchJobSubmissionStateCounter(OperationState.INITIALIZED) +
-      getBatchJobSubmissionStateCounter(OperationState.PENDING) +
-      getBatchJobSubmissionStateCounter(OperationState.RUNNING) === 1)
+    val batchId = UUID.randomUUID().toString
+    val requestObj = newSparkBatchRequest(Map(
+      "spark.master" -> "local",
+      KYUUBI_BATCH_ID_KEY -> batchId))
 
-    while (batch.getState == OperationState.PENDING.toString ||
-      batch.getState == OperationState.RUNNING.toString) {
-      val deleteResp = webTarget.path(s"api/v1/batches/${batch.getId}")
+    eventually(timeout(10.seconds)) {
+      val response = webTarget.path("api/v1/batches")
         .request(MediaType.APPLICATION_JSON_TYPE)
-        .delete()
-      assert(200 == deleteResp.getStatus)
+        .post(Entity.entity(requestObj, MediaType.APPLICATION_JSON_TYPE))
+      assert(response.getStatus === 200)
+      val batch = response.readEntity(classOf[Batch])
+      assert(batch.getState === OperationState.PENDING.toString ||
+        batch.getState === OperationState.RUNNING.toString)
+    }
 
-      batch = webTarget.path(s"api/v1/batches/${batch.getId}")
-        .request(MediaType.APPLICATION_JSON_TYPE)
-        .get().readEntity(classOf[Batch])
+    eventually(timeout(10.seconds)) {
+      assert(getBatchJobSubmissionStateCounter(OperationState.INITIALIZED) +
+        getBatchJobSubmissionStateCounter(OperationState.PENDING) +
+        getBatchJobSubmissionStateCounter(OperationState.RUNNING) === 1)
     }
 
-    assert(getBatchJobSubmissionStateCounter(OperationState.INITIALIZED) === 0)
-    assert(getBatchJobSubmissionStateCounter(OperationState.PENDING) === 0)
-    assert(getBatchJobSubmissionStateCounter(OperationState.RUNNING) === 0)
+    val deleteResp = webTarget.path(s"api/v1/batches/$batchId")
+      .request(MediaType.APPLICATION_JSON_TYPE)
+      .delete()
+    assert(deleteResp.getStatus === 200)
+
+    eventually(timeout(10.seconds)) {
+      assert(getBatchJobSubmissionStateCounter(OperationState.INITIALIZED) === 
0)
+      assert(getBatchJobSubmissionStateCounter(OperationState.PENDING) === 0)
+      assert(getBatchJobSubmissionStateCounter(OperationState.RUNNING) === 0)
+    }
 
-    val currentTeminateCount = 
getBatchJobSubmissionStateCounter(OperationState.CANCELED) +
+    val currentTerminatedCount = 
getBatchJobSubmissionStateCounter(OperationState.CANCELED) +
       getBatchJobSubmissionStateCounter(OperationState.FINISHED) +
       getBatchJobSubmissionStateCounter(OperationState.ERROR)
-    assert(currentTeminateCount - originalTerminateCounter === 1)
+    assert(currentTerminatedCount - originalTerminatedCount === 1)
   }
 
   private def getBatchJobSubmissionStateCounter(state: OperationState): Long = 
{
@@ -706,9 +776,9 @@ class BatchesResourceSuite extends KyuubiFunSuite with 
RestFrontendTestHelper wi
         Map(KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString),
         newSparkBatchRequest(Map("spark.jars" -> "disAllowPath")))
     }
-    val sessionHandleRegex = "\\[[\\S]*\\]".r
+    val sessionHandleRegex = "\\[\\S*]".r
     val batchId = 
sessionHandleRegex.findFirstMatchIn(e.getMessage).get.group(0)
-      .replaceAll("\\[", "").replaceAll("\\]", "")
+      .replaceAll("\\[", "").replaceAll("]", "")
     
assert(sessionManager.getBatchMetadata(batchId).map(_.state).contains("CANCELED"))
   }
 

Reply via email to