This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new ae3b81395 [KYUUBI #4798] Allows BatchJobSubmission to run in sync mode
ae3b81395 is described below
commit ae3b81395c9a5805d226d56671c2acdbbf365a31
Author: Cheng Pan <[email protected]>
AuthorDate: Sun May 7 19:31:45 2023 +0800
[KYUUBI #4798] Allows BatchJobSubmission to run in sync mode
### _Why are the changes needed?_
Currently, BatchJobSubmission is only allowed to run in async mode, this PR
makes the `shouldRunAsync` configurable and allows BatchJobSubmission to run in
sync mode. (To minimize the change, in sync mode, the real submission and
monitoring still happen on the exec pool, the BatchJobSubmission just blocks
until the batch is finished)
This PR also refactors the constructor parameters of
`KyuubiBatchSessionImpl`, and unwrapped the BatchRequest to make it fit the
Batch V2 design.
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run
test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #4798 from pan3793/batch-sync.
Closes #4798
38eee2708 [Cheng Pan] Allows BatchJobSubmission run in sync mode
Authored-by: Cheng Pan <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
.../kyuubi/operation/BatchJobSubmission.scala | 52 ++++++++----------
.../kyuubi/operation/KyuubiOperationManager.scala | 6 ++-
.../kyuubi/session/KyuubiBatchSessionImpl.scala | 57 ++++++++++----------
.../kyuubi/session/KyuubiSessionManager.scala | 62 +++++++++++++++-------
4 files changed, 98 insertions(+), 79 deletions(-)
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
index 702a9a917..e77416d31 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala
@@ -58,12 +58,11 @@ class BatchJobSubmission(
className: String,
batchConf: Map[String, String],
batchArgs: Seq[String],
- recoveryMetadata: Option[Metadata])
+ recoveryMetadata: Option[Metadata],
+ override val shouldRunAsync: Boolean)
extends KyuubiApplicationOperation(session) {
import BatchJobSubmission._
- override def shouldRunAsync: Boolean = true
-
private val _operationLog = OperationLog.createOperationLog(session,
getHandle)
private val applicationManager = session.sessionManager.applicationManager
@@ -131,17 +130,10 @@ class BatchJobSubmission(
session.sessionConf.get(KyuubiConf.BATCH_APPLICATION_STARVATION_TIMEOUT)
private def updateBatchMetadata(): Unit = {
- val endTime =
- if (isTerminalState(state)) {
- lastAccessTime
- } else {
- 0L
- }
+ val endTime = if (isTerminalState(state)) lastAccessTime else 0L
- if (isTerminalState(state)) {
- if (_applicationInfo.isEmpty) {
- _applicationInfo = Some(ApplicationInfo.NOT_FOUND)
- }
+ if (isTerminalState(state) && _applicationInfo.isEmpty) {
+ _applicationInfo = Some(ApplicationInfo.NOT_FOUND)
}
_applicationInfo.foreach { appInfo =>
@@ -187,27 +179,24 @@ class BatchJobSubmission(
override protected def runInternal(): Unit = session.handleSessionException {
val asyncOperation: Runnable = () => {
try {
- if (recoveryMetadata.exists(_.peerInstanceClosed)) {
- setState(OperationState.CANCELED)
- } else {
- // If it is in recovery mode, only re-submit batch job if previous
state is PENDING and
- // fail to fetch the status including appId from resource manager.
Otherwise, monitor the
- // submitted batch application.
- recoveryMetadata.map { metadata =>
- if (metadata.state == OperationState.PENDING.toString) {
- _applicationInfo = currentApplicationInfo()
- applicationId(_applicationInfo) match {
- case Some(appId) => monitorBatchJob(appId)
- case None => submitAndMonitorBatchJob()
- }
- } else {
- monitorBatchJob(metadata.engineId)
+ recoveryMetadata match {
+ case Some(metadata) if metadata.peerInstanceClosed =>
+ setState(OperationState.CANCELED)
+ case Some(metadata) if metadata.state ==
OperationState.PENDING.toString =>
+ // In recovery mode, only submit batch job when previous state is
PENDING
+ // and fail to fetch the status including appId from resource
manager.
+ // Otherwise, monitor the submitted batch application.
+ _applicationInfo = currentApplicationInfo()
+ applicationId(_applicationInfo) match {
+ case Some(appId) => monitorBatchJob(appId)
+ case None => submitAndMonitorBatchJob()
}
- }.getOrElse {
+ case Some(metadata) =>
+ monitorBatchJob(metadata.engineId)
+ case None =>
submitAndMonitorBatchJob()
- }
- setStateIfNotCanceled(OperationState.FINISHED)
}
+ setStateIfNotCanceled(OperationState.FINISHED)
} catch {
onError()
} finally {
@@ -225,6 +214,7 @@ class BatchJobSubmission(
updateBatchMetadata()
}
}
+ if (!shouldRunAsync) getBackgroundHandle.get()
}
private def submitAndMonitorBatchJob(): Unit = {
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
index dd4889653..6846d0316 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
@@ -81,7 +81,8 @@ class KyuubiOperationManager private (name: String) extends
OperationManager(nam
className: String,
batchConf: Map[String, String],
batchArgs: Seq[String],
- recoveryMetadata: Option[Metadata]): BatchJobSubmission = {
+ recoveryMetadata: Option[Metadata],
+ shouldRunAsync: Boolean): BatchJobSubmission = {
val operation = new BatchJobSubmission(
session,
batchType,
@@ -90,7 +91,8 @@ class KyuubiOperationManager private (name: String) extends
OperationManager(nam
className,
batchConf,
batchArgs,
- recoveryMetadata)
+ recoveryMetadata,
+ shouldRunAsync)
addOperation(operation)
operation
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala
index 94859a08c..ba2046829 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala
@@ -21,7 +21,6 @@ import scala.collection.JavaConverters._
import org.apache.hive.service.rpc.thrift.TProtocolVersion
-import org.apache.kyuubi.client.api.v1.dto.BatchRequest
import org.apache.kyuubi.client.util.BatchUtils._
import org.apache.kyuubi.config.{KyuubiConf, KyuubiReservedKeys}
import org.apache.kyuubi.engine.KyuubiApplicationManager
@@ -38,8 +37,14 @@ class KyuubiBatchSessionImpl(
conf: Map[String, String],
override val sessionManager: KyuubiSessionManager,
val sessionConf: KyuubiConf,
- batchRequest: BatchRequest,
- recoveryMetadata: Option[Metadata] = None)
+ batchType: String,
+ batchName: Option[String],
+ resource: String,
+ className: String,
+ batchConf: Map[String, String],
+ batchArgs: Seq[String],
+ recoveryMetadata: Option[Metadata] = None,
+ shouldRunAsync: Boolean)
extends KyuubiSession(
TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1,
user,
@@ -68,42 +73,41 @@ class KyuubiBatchSessionImpl(
override val sessionIdleTimeoutThreshold: Long =
sessionManager.getConf.get(KyuubiConf.BATCH_SESSION_IDLE_TIMEOUT)
- override val normalizedConf: Map[String, String] = {
- sessionConf.getBatchConf(batchRequest.getBatchType) ++
- sessionManager.validateBatchConf(batchRequest.getConf.asScala.toMap)
- }
+ override val normalizedConf: Map[String, String] =
+ sessionConf.getBatchConf(batchType) ++
sessionManager.validateBatchConf(batchConf)
- private val optimizedConf: Map[String, String] = {
+ val optimizedConf: Map[String, String] = {
val confOverlay = sessionManager.sessionConfAdvisor.getConfOverlay(
user,
normalizedConf.asJava)
if (confOverlay != null) {
val overlayConf = new KyuubiConf(false)
confOverlay.asScala.foreach { case (k, v) => overlayConf.set(k, v) }
- normalizedConf ++ overlayConf.getBatchConf(batchRequest.getBatchType)
+ normalizedConf ++ overlayConf.getBatchConf(batchType)
} else {
warn(s"the server plugin return null value for user: $user, ignore it")
normalizedConf
}
}
- override lazy val name: Option[String] = Option(batchRequest.getName).orElse(
- optimizedConf.get(KyuubiConf.SESSION_NAME.key))
+ override lazy val name: Option[String] =
+
batchName.filterNot(_.trim.isEmpty).orElse(optimizedConf.get(KyuubiConf.SESSION_NAME.key))
// whether the resource file is from uploading
- private[kyuubi] val isResourceUploaded: Boolean = batchRequest.getConf
- .getOrDefault(KyuubiReservedKeys.KYUUBI_BATCH_RESOURCE_UPLOADED_KEY,
"false").toBoolean
+ private[kyuubi] val isResourceUploaded: Boolean =
+ batchConf.getOrElse(KyuubiReservedKeys.KYUUBI_BATCH_RESOURCE_UPLOADED_KEY,
"false").toBoolean
private[kyuubi] lazy val batchJobSubmissionOp =
sessionManager.operationManager
.newBatchJobSubmissionOperation(
this,
- batchRequest.getBatchType,
+ batchType,
name.orNull,
- batchRequest.getResource,
- batchRequest.getClassName,
+ resource,
+ className,
optimizedConf,
- batchRequest.getArgs.asScala,
- recoveryMetadata)
+ batchArgs,
+ recoveryMetadata,
+ shouldRunAsync)
private def waitMetadataRequestsRetryCompletion(): Unit = {
val batchId = batchJobSubmissionOp.batchId
@@ -127,14 +131,11 @@ class KyuubiBatchSessionImpl(
override def checkSessionAccessPathURIs(): Unit = {
KyuubiApplicationManager.checkApplicationAccessPaths(
- batchRequest.getBatchType,
+ batchType,
optimizedConf,
sessionManager.getConf)
- if (batchRequest.getResource != SparkProcessBuilder.INTERNAL_RESOURCE
- && !isResourceUploaded) {
- KyuubiApplicationManager.checkApplicationAccessPath(
- batchRequest.getResource,
- sessionManager.getConf)
+ if (resource != SparkProcessBuilder.INTERNAL_RESOURCE &&
!isResourceUploaded) {
+ KyuubiApplicationManager.checkApplicationAccessPath(resource,
sessionManager.getConf)
}
}
@@ -150,13 +151,13 @@ class KyuubiBatchSessionImpl(
ipAddress = ipAddress,
kyuubiInstance = connectionUrl,
state = OperationState.PENDING.toString,
- resource = batchRequest.getResource,
- className = batchRequest.getClassName,
+ resource = resource,
+ className = className,
requestName = name.orNull,
requestConf = optimizedConf,
- requestArgs = batchRequest.getArgs.asScala,
+ requestArgs = batchArgs,
createTime = createTime,
- engineType = batchRequest.getBatchType,
+ engineType = batchType,
clusterManager = batchJobSubmissionOp.builder.clusterManager())
// there is a chance that operation failed w/ duplicated key error
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
index b0ed144a5..0ef3f1ac1 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
@@ -124,23 +124,38 @@ class KyuubiSessionManager private (name: String) extends
SessionManager(name) {
}
}
- private def createBatchSession(
+ // scalastyle:off
+ def createBatchSession(
user: String,
password: String,
ipAddress: String,
conf: Map[String, String],
- batchRequest: BatchRequest,
- recoveryMetadata: Option[Metadata] = None): KyuubiBatchSessionImpl = {
+ batchType: String,
+ batchName: Option[String],
+ resource: String,
+ className: String,
+ batchConf: Map[String, String],
+ batchArgs: Seq[String],
+ recoveryMetadata: Option[Metadata] = None,
+ shouldRunAsync: Boolean): KyuubiBatchSessionImpl = {
+ // scalastyle:on
val username = Option(user).filter(_.nonEmpty).getOrElse("anonymous")
+ val sessionConf = this.getConf.getUserDefaults(user)
new KyuubiBatchSessionImpl(
username,
password,
ipAddress,
conf,
this,
- this.getConf.getUserDefaults(user),
- batchRequest,
- recoveryMetadata)
+ sessionConf,
+ batchType,
+ batchName,
+ resource,
+ className,
+ batchConf,
+ batchArgs,
+ recoveryMetadata,
+ shouldRunAsync)
}
private[kyuubi] def openBatchSession(batchSession: KyuubiBatchSessionImpl):
SessionHandle = {
@@ -178,8 +193,21 @@ class KyuubiSessionManager private (name: String) extends
SessionManager(name) {
password: String,
ipAddress: String,
conf: Map[String, String],
- batchRequest: BatchRequest): SessionHandle = {
- val batchSession = createBatchSession(user, password, ipAddress, conf,
batchRequest)
+ batchRequest: BatchRequest,
+ shouldRunAsync: Boolean = true): SessionHandle = {
+ val batchSession = createBatchSession(
+ user,
+ password,
+ ipAddress,
+ conf,
+ batchRequest.getBatchType,
+ Option(batchRequest.getName),
+ batchRequest.getResource,
+ batchRequest.getClassName,
+ batchRequest.getConf.asScala.toMap,
+ batchRequest.getArgs.asScala,
+ None,
+ shouldRunAsync)
openBatchSession(batchSession)
}
@@ -246,21 +274,19 @@ class KyuubiSessionManager private (name: String) extends
SessionManager(name) {
kyuubiInstance,
0,
Int.MaxValue).map { metadata =>
- val batchRequest = new BatchRequest(
- metadata.engineType,
- metadata.resource,
- metadata.className,
- metadata.requestName,
- metadata.requestConf.asJava,
- metadata.requestArgs.asJava)
-
createBatchSession(
metadata.username,
"anonymous",
metadata.ipAddress,
metadata.requestConf,
- batchRequest,
- Some(metadata))
+ metadata.engineType,
+ Option(metadata.requestName),
+ metadata.resource,
+ metadata.className,
+ metadata.requestConf,
+ metadata.requestArgs,
+ Some(metadata),
+ shouldRunAsync = true)
}).getOrElse(Seq.empty)
}
}