This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new ae3b81395 [KYUUBI #4798] Allows BatchJobSubmission to run in sync mode
ae3b81395 is described below

commit ae3b81395c9a5805d226d56671c2acdbbf365a31
Author: Cheng Pan <[email protected]>
AuthorDate: Sun May 7 19:31:45 2023 +0800

    [KYUUBI #4798] Allows BatchJobSubmission to run in sync mode
    
    ### _Why are the changes needed?_
    
    Currently, BatchJobSubmission is only allowed to run in async mode, this PR 
makes the `shouldRunAsync` configurable and allows BatchJobSubmission to run in 
sync mode. (To minimize the change, in sync mode, the real submission and 
monitoring still happen on the exec pool, the BatchJobSubmission just blocks 
until the batch is finished)
    
    This PR also refactors the constructor parameters of 
`KyuubiBatchSessionImpl`, and unwrapped the BatchRequest to make it fit the 
Batch V2 design.
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run 
test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests)
 locally before make a pull request
    
    Closes #4798 from pan3793/batch-sync.
    
    Closes #4798
    
    38eee2708 [Cheng Pan] Allows BatchJobSubmission run in sync mode
    
    Authored-by: Cheng Pan <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
---
 .../kyuubi/operation/BatchJobSubmission.scala      | 52 ++++++++----------
 .../kyuubi/operation/KyuubiOperationManager.scala  |  6 ++-
 .../kyuubi/session/KyuubiBatchSessionImpl.scala    | 57 ++++++++++----------
 .../kyuubi/session/KyuubiSessionManager.scala      | 62 +++++++++++++++-------
 4 files changed, 98 insertions(+), 79 deletions(-)

diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
index 702a9a917..e77416d31 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
@@ -58,12 +58,11 @@ class BatchJobSubmission(
     className: String,
     batchConf: Map[String, String],
     batchArgs: Seq[String],
-    recoveryMetadata: Option[Metadata])
+    recoveryMetadata: Option[Metadata],
+    override val shouldRunAsync: Boolean)
   extends KyuubiApplicationOperation(session) {
   import BatchJobSubmission._
 
-  override def shouldRunAsync: Boolean = true
-
   private val _operationLog = OperationLog.createOperationLog(session, 
getHandle)
 
   private val applicationManager = session.sessionManager.applicationManager
@@ -131,17 +130,10 @@ class BatchJobSubmission(
     session.sessionConf.get(KyuubiConf.BATCH_APPLICATION_STARVATION_TIMEOUT)
 
   private def updateBatchMetadata(): Unit = {
-    val endTime =
-      if (isTerminalState(state)) {
-        lastAccessTime
-      } else {
-        0L
-      }
+    val endTime = if (isTerminalState(state)) lastAccessTime else 0L
 
-    if (isTerminalState(state)) {
-      if (_applicationInfo.isEmpty) {
-        _applicationInfo = Some(ApplicationInfo.NOT_FOUND)
-      }
+    if (isTerminalState(state) && _applicationInfo.isEmpty) {
+      _applicationInfo = Some(ApplicationInfo.NOT_FOUND)
     }
 
     _applicationInfo.foreach { appInfo =>
@@ -187,27 +179,24 @@ class BatchJobSubmission(
   override protected def runInternal(): Unit = session.handleSessionException {
     val asyncOperation: Runnable = () => {
       try {
-        if (recoveryMetadata.exists(_.peerInstanceClosed)) {
-          setState(OperationState.CANCELED)
-        } else {
-          // If it is in recovery mode, only re-submit batch job if previous 
state is PENDING and
-          // fail to fetch the status including appId from resource manager. 
Otherwise, monitor the
-          // submitted batch application.
-          recoveryMetadata.map { metadata =>
-            if (metadata.state == OperationState.PENDING.toString) {
-              _applicationInfo = currentApplicationInfo()
-              applicationId(_applicationInfo) match {
-                case Some(appId) => monitorBatchJob(appId)
-                case None => submitAndMonitorBatchJob()
-              }
-            } else {
-              monitorBatchJob(metadata.engineId)
+        recoveryMetadata match {
+          case Some(metadata) if metadata.peerInstanceClosed =>
+            setState(OperationState.CANCELED)
+          case Some(metadata) if metadata.state == 
OperationState.PENDING.toString =>
+            // In recovery mode, only submit batch job when previous state is 
PENDING
+            // and fail to fetch the status including appId from resource 
manager.
+            // Otherwise, monitor the submitted batch application.
+            _applicationInfo = currentApplicationInfo()
+            applicationId(_applicationInfo) match {
+              case Some(appId) => monitorBatchJob(appId)
+              case None => submitAndMonitorBatchJob()
             }
-          }.getOrElse {
+          case Some(metadata) =>
+            monitorBatchJob(metadata.engineId)
+          case None =>
             submitAndMonitorBatchJob()
-          }
-          setStateIfNotCanceled(OperationState.FINISHED)
         }
+        setStateIfNotCanceled(OperationState.FINISHED)
       } catch {
         onError()
       } finally {
@@ -225,6 +214,7 @@ class BatchJobSubmission(
         updateBatchMetadata()
       }
     }
+    if (!shouldRunAsync) getBackgroundHandle.get()
   }
 
   private def submitAndMonitorBatchJob(): Unit = {
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
index dd4889653..6846d0316 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
@@ -81,7 +81,8 @@ class KyuubiOperationManager private (name: String) extends 
OperationManager(nam
       className: String,
       batchConf: Map[String, String],
       batchArgs: Seq[String],
-      recoveryMetadata: Option[Metadata]): BatchJobSubmission = {
+      recoveryMetadata: Option[Metadata],
+      shouldRunAsync: Boolean): BatchJobSubmission = {
     val operation = new BatchJobSubmission(
       session,
       batchType,
@@ -90,7 +91,8 @@ class KyuubiOperationManager private (name: String) extends 
OperationManager(nam
       className,
       batchConf,
       batchArgs,
-      recoveryMetadata)
+      recoveryMetadata,
+      shouldRunAsync)
     addOperation(operation)
     operation
   }
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala
index 94859a08c..ba2046829 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala
@@ -21,7 +21,6 @@ import scala.collection.JavaConverters._
 
 import org.apache.hive.service.rpc.thrift.TProtocolVersion
 
-import org.apache.kyuubi.client.api.v1.dto.BatchRequest
 import org.apache.kyuubi.client.util.BatchUtils._
 import org.apache.kyuubi.config.{KyuubiConf, KyuubiReservedKeys}
 import org.apache.kyuubi.engine.KyuubiApplicationManager
@@ -38,8 +37,14 @@ class KyuubiBatchSessionImpl(
     conf: Map[String, String],
     override val sessionManager: KyuubiSessionManager,
     val sessionConf: KyuubiConf,
-    batchRequest: BatchRequest,
-    recoveryMetadata: Option[Metadata] = None)
+    batchType: String,
+    batchName: Option[String],
+    resource: String,
+    className: String,
+    batchConf: Map[String, String],
+    batchArgs: Seq[String],
+    recoveryMetadata: Option[Metadata] = None,
+    shouldRunAsync: Boolean)
   extends KyuubiSession(
     TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1,
     user,
@@ -68,42 +73,41 @@ class KyuubiBatchSessionImpl(
   override val sessionIdleTimeoutThreshold: Long =
     sessionManager.getConf.get(KyuubiConf.BATCH_SESSION_IDLE_TIMEOUT)
 
-  override val normalizedConf: Map[String, String] = {
-    sessionConf.getBatchConf(batchRequest.getBatchType) ++
-      sessionManager.validateBatchConf(batchRequest.getConf.asScala.toMap)
-  }
+  override val normalizedConf: Map[String, String] =
+    sessionConf.getBatchConf(batchType) ++ 
sessionManager.validateBatchConf(batchConf)
 
-  private val optimizedConf: Map[String, String] = {
+  val optimizedConf: Map[String, String] = {
     val confOverlay = sessionManager.sessionConfAdvisor.getConfOverlay(
       user,
       normalizedConf.asJava)
     if (confOverlay != null) {
       val overlayConf = new KyuubiConf(false)
       confOverlay.asScala.foreach { case (k, v) => overlayConf.set(k, v) }
-      normalizedConf ++ overlayConf.getBatchConf(batchRequest.getBatchType)
+      normalizedConf ++ overlayConf.getBatchConf(batchType)
     } else {
       warn(s"the server plugin return null value for user: $user, ignore it")
       normalizedConf
     }
   }
 
-  override lazy val name: Option[String] = Option(batchRequest.getName).orElse(
-    optimizedConf.get(KyuubiConf.SESSION_NAME.key))
+  override lazy val name: Option[String] =
+    
batchName.filterNot(_.trim.isEmpty).orElse(optimizedConf.get(KyuubiConf.SESSION_NAME.key))
 
   // whether the resource file is from uploading
-  private[kyuubi] val isResourceUploaded: Boolean = batchRequest.getConf
-    .getOrDefault(KyuubiReservedKeys.KYUUBI_BATCH_RESOURCE_UPLOADED_KEY, 
"false").toBoolean
+  private[kyuubi] val isResourceUploaded: Boolean =
+    batchConf.getOrElse(KyuubiReservedKeys.KYUUBI_BATCH_RESOURCE_UPLOADED_KEY, 
"false").toBoolean
 
   private[kyuubi] lazy val batchJobSubmissionOp = 
sessionManager.operationManager
     .newBatchJobSubmissionOperation(
       this,
-      batchRequest.getBatchType,
+      batchType,
       name.orNull,
-      batchRequest.getResource,
-      batchRequest.getClassName,
+      resource,
+      className,
       optimizedConf,
-      batchRequest.getArgs.asScala,
-      recoveryMetadata)
+      batchArgs,
+      recoveryMetadata,
+      shouldRunAsync)
 
   private def waitMetadataRequestsRetryCompletion(): Unit = {
     val batchId = batchJobSubmissionOp.batchId
@@ -127,14 +131,11 @@ class KyuubiBatchSessionImpl(
 
   override def checkSessionAccessPathURIs(): Unit = {
     KyuubiApplicationManager.checkApplicationAccessPaths(
-      batchRequest.getBatchType,
+      batchType,
       optimizedConf,
       sessionManager.getConf)
-    if (batchRequest.getResource != SparkProcessBuilder.INTERNAL_RESOURCE
-      && !isResourceUploaded) {
-      KyuubiApplicationManager.checkApplicationAccessPath(
-        batchRequest.getResource,
-        sessionManager.getConf)
+    if (resource != SparkProcessBuilder.INTERNAL_RESOURCE && 
!isResourceUploaded) {
+      KyuubiApplicationManager.checkApplicationAccessPath(resource, 
sessionManager.getConf)
     }
   }
 
@@ -150,13 +151,13 @@ class KyuubiBatchSessionImpl(
         ipAddress = ipAddress,
         kyuubiInstance = connectionUrl,
         state = OperationState.PENDING.toString,
-        resource = batchRequest.getResource,
-        className = batchRequest.getClassName,
+        resource = resource,
+        className = className,
         requestName = name.orNull,
         requestConf = optimizedConf,
-        requestArgs = batchRequest.getArgs.asScala,
+        requestArgs = batchArgs,
         createTime = createTime,
-        engineType = batchRequest.getBatchType,
+        engineType = batchType,
         clusterManager = batchJobSubmissionOp.builder.clusterManager())
 
       // there is a chance that operation failed w/ duplicated key error
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
index b0ed144a5..0ef3f1ac1 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
@@ -124,23 +124,38 @@ class KyuubiSessionManager private (name: String) extends 
SessionManager(name) {
     }
   }
 
-  private def createBatchSession(
+  // scalastyle:off
+  def createBatchSession(
       user: String,
       password: String,
       ipAddress: String,
       conf: Map[String, String],
-      batchRequest: BatchRequest,
-      recoveryMetadata: Option[Metadata] = None): KyuubiBatchSessionImpl = {
+      batchType: String,
+      batchName: Option[String],
+      resource: String,
+      className: String,
+      batchConf: Map[String, String],
+      batchArgs: Seq[String],
+      recoveryMetadata: Option[Metadata] = None,
+      shouldRunAsync: Boolean): KyuubiBatchSessionImpl = {
+    // scalastyle:on
     val username = Option(user).filter(_.nonEmpty).getOrElse("anonymous")
+    val sessionConf = this.getConf.getUserDefaults(user)
     new KyuubiBatchSessionImpl(
       username,
       password,
       ipAddress,
       conf,
       this,
-      this.getConf.getUserDefaults(user),
-      batchRequest,
-      recoveryMetadata)
+      sessionConf,
+      batchType,
+      batchName,
+      resource,
+      className,
+      batchConf,
+      batchArgs,
+      recoveryMetadata,
+      shouldRunAsync)
   }
 
   private[kyuubi] def openBatchSession(batchSession: KyuubiBatchSessionImpl): 
SessionHandle = {
@@ -178,8 +193,21 @@ class KyuubiSessionManager private (name: String) extends 
SessionManager(name) {
       password: String,
       ipAddress: String,
       conf: Map[String, String],
-      batchRequest: BatchRequest): SessionHandle = {
-    val batchSession = createBatchSession(user, password, ipAddress, conf, 
batchRequest)
+      batchRequest: BatchRequest,
+      shouldRunAsync: Boolean = true): SessionHandle = {
+    val batchSession = createBatchSession(
+      user,
+      password,
+      ipAddress,
+      conf,
+      batchRequest.getBatchType,
+      Option(batchRequest.getName),
+      batchRequest.getResource,
+      batchRequest.getClassName,
+      batchRequest.getConf.asScala.toMap,
+      batchRequest.getArgs.asScala,
+      None,
+      shouldRunAsync)
     openBatchSession(batchSession)
   }
 
@@ -246,21 +274,19 @@ class KyuubiSessionManager private (name: String) extends 
SessionManager(name) {
         kyuubiInstance,
         0,
         Int.MaxValue).map { metadata =>
-        val batchRequest = new BatchRequest(
-          metadata.engineType,
-          metadata.resource,
-          metadata.className,
-          metadata.requestName,
-          metadata.requestConf.asJava,
-          metadata.requestArgs.asJava)
-
         createBatchSession(
           metadata.username,
           "anonymous",
           metadata.ipAddress,
           metadata.requestConf,
-          batchRequest,
-          Some(metadata))
+          metadata.engineType,
+          Option(metadata.requestName),
+          metadata.resource,
+          metadata.className,
+          metadata.requestConf,
+          metadata.requestArgs,
+          Some(metadata),
+          shouldRunAsync = true)
       }).getOrElse(Seq.empty)
     }
   }

Reply via email to