This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 1554032 [KYUUBI #1346] Support launch query engine asynchronously
during opening session
1554032 is described below
commit 1554032df2576695f6345232dc61c0162644aec3
Author: fwang12 <[email protected]>
AuthorDate: Sat Nov 13 17:25:57 2021 +0800
[KYUUBI #1346] Support launch query engine asynchronously during opening
session
<!--
Thanks for sending a pull request!
Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://kyuubi.readthedocs.io/en/latest/community/contributions.html
2. If the PR is related to an issue in
https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your
PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
3. If the PR is unfinished, add '[WIP]' in your PR title, e.g.,
'[WIP][KYUUBI #XXXX] Your PR title ...'.
-->
### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
1. If you add a feature, you can talk about the use case of it.
2. If you fix a bug, you can clarify why it is a bug.
-->
Support launch query engine asynchronously during opening session, make it
more user-friendly.
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run
test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #1346 from turboFei/kyuubi_hive_jdbc_ext.
Closes #1346
513691d6 [fwang12] address comments
63b8f901 [fwang12] remove hanle to client
4590ad98 [fwang12] remove synchronized
f3b136b4 [fwang12] add ut for open session with failure
2cae9afa [fwang12] address comments
35113d0f [fwang12] add engine session id to session event
34c27f9c [fwang12] refactor log
b5f7e639 [fwang12] block
b8b0cf9d [fwang12] to support aync client
66eb07f3 [fwang12] address comments
623c0511 [fwang12] volatile
282c9f07 [fwang12] refactor conf
5047d91d [fwang12] server session handle is not same with that of engine
dc1cb8bc [fwang12] update docs
b773fd9d [fwang12] refactor
5f657e44 [fwang12] add ut
9e56cbab [fwang12] exclude engine init engine
6abdb52c [fwang12] refactor
d3e37a8c [fwang12] block until engine finished
085b324d [fwang12] save
Authored-by: fwang12 <[email protected]>
Signed-off-by: fwang12 <[email protected]>
---
docs/deployment/settings.md | 1 +
.../org/apache/kyuubi/config/KyuubiConf.scala | 10 +++
.../apache/kyuubi/operation/OperationType.scala | 5 +-
.../kyuubi/operation/HiveJDBCTestHelper.scala | 3 +
.../apache/kyuubi/events/KyuubiSessionEvent.scala | 2 +
.../apache/kyuubi/operation/ExecuteStatement.scala | 4 +-
.../org/apache/kyuubi/operation/GetCatalogs.scala | 4 +-
.../org/apache/kyuubi/operation/GetColumns.scala | 4 +-
.../org/apache/kyuubi/operation/GetFunctions.scala | 4 +-
.../org/apache/kyuubi/operation/GetSchemas.scala | 4 +-
.../apache/kyuubi/operation/GetTableTypes.scala | 7 +-
.../org/apache/kyuubi/operation/GetTables.scala | 4 +-
.../org/apache/kyuubi/operation/GetTypeInfo.scala | 4 +-
.../apache/kyuubi/operation/KyuubiOperation.scala | 11 ++-
.../kyuubi/operation/KyuubiOperationManager.scala | 56 ++++----------
.../org/apache/kyuubi/operation/LaunchEngine.scala | 64 +++++++++++++++
.../apache/kyuubi/session/KyuubiSessionImpl.scala | 90 +++++++++++++++-------
.../kyuubi/events/EventLoggingServiceSuite.scala | 6 +-
.../KyuubiOperationPerConnectionSuite.scala | 35 +++++++++
.../operation/KyuubiOperationPerUserSuite.scala | 18 +++++
20 files changed, 232 insertions(+), 104 deletions(-)
diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md
index bd213a1..fa658cd 100644
--- a/docs/deployment/settings.md
+++ b/docs/deployment/settings.md
@@ -295,6 +295,7 @@ kyuubi\.session\.conf<br>\.restrict\.list|<div
style='width: 65pt;word-wrap: bre
kyuubi\.session\.engine<br>\.check\.interval|<div style='width:
65pt;word-wrap: break-word;white-space: normal'>PT5M</div>|<div style='width:
170pt;word-wrap: break-word;white-space: normal'>The check interval for engine
timeout</div>|<div style='width: 30pt'>duration</div>|<div style='width:
20pt'>1.0.0</div>
kyuubi\.session\.engine<br>\.idle\.timeout|<div style='width: 65pt;word-wrap:
break-word;white-space: normal'>PT30M</div>|<div style='width: 170pt;word-wrap:
break-word;white-space: normal'>engine timeout, the engine will self-terminate
when it's not accessed for this duration</div>|<div style='width:
30pt'>duration</div>|<div style='width: 20pt'>1.0.0</div>
kyuubi\.session\.engine<br>\.initialize\.timeout|<div style='width:
65pt;word-wrap: break-word;white-space: normal'>PT3M</div>|<div style='width:
170pt;word-wrap: break-word;white-space: normal'>Timeout for starting the
background engine, e.g. SparkSQLEngine.</div>|<div style='width:
30pt'>duration</div>|<div style='width: 20pt'>1.0.0</div>
+kyuubi\.session\.engine<br>\.launch\.async|<div style='width: 65pt;word-wrap:
break-word;white-space: normal'>false</div>|<div style='width: 170pt;word-wrap:
break-word;white-space: normal'>When opening kyuubi session, whether to launch
backend engine asynchronously. When true, the Kyuubi server will set up the
connection with the client without delay as the backend engine will be created
asynchronously.</div>|<div style='width: 30pt'>boolean</div>|<div style='width:
20pt'>1.4.0</div>
kyuubi\.session\.engine<br>\.log\.timeout|<div style='width: 65pt;word-wrap:
break-word;white-space: normal'>PT24H</div>|<div style='width: 170pt;word-wrap:
break-word;white-space: normal'>If we use Spark as the engine then the session
submit log is the console output of spark-submit. We will retain the session
submit log until over the config value.</div>|<div style='width:
30pt'>duration</div>|<div style='width: 20pt'>1.1.0</div>
kyuubi\.session\.engine<br>\.login\.timeout|<div style='width: 65pt;word-wrap:
break-word;white-space: normal'>PT15S</div>|<div style='width: 170pt;word-wrap:
break-word;white-space: normal'>The timeout of creating the connection to
remote sql query engine</div>|<div style='width: 30pt'>duration</div>|<div
style='width: 20pt'>1.0.0</div>
kyuubi\.session\.engine<br>\.request\.timeout|<div style='width:
65pt;word-wrap: break-word;white-space: normal'>PT1M</div>|<div style='width:
170pt;word-wrap: break-word;white-space: normal'>The timeout of awaiting
response after sending request to remote sql query engine</div>|<div
style='width: 30pt'>duration</div>|<div style='width: 20pt'>1.4.0</div>
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index 0a3eac3..a5c309c 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -657,6 +657,16 @@ object KyuubiConf {
.checkValue(_ > 0, "the maximum must be positive integer.")
.createWithDefault(10)
+ // TODO: make it true by default
+ val SESSION_ENGINE_LAUNCH_ASYNC: ConfigEntry[Boolean] =
+ buildConf("session.engine.launch.async")
+ .doc("When opening kyuubi session, whether to launch backend engine
asynchronously." +
+ " When true, the Kyuubi server will set up the connection with the
client without delay" +
+ " as the backend engine will be created asynchronously.")
+ .version("1.4.0")
+ .booleanConf
+ .createWithDefault(false)
+
val SERVER_EXEC_POOL_SIZE: ConfigEntry[Int] =
buildConf("backend.server.exec.pool.size")
.doc("Number of threads in the operation execution thread pool of Kyuubi
server")
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationType.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationType.scala
index e4686aa..abc7bf3 100644
---
a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationType.scala
+++
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationType.scala
@@ -30,7 +30,8 @@ object OperationType extends Enumeration {
GET_TABLES,
GET_TABLE_TYPES,
GET_COLUMNS,
- GET_FUNCTIONS = Value
+ GET_FUNCTIONS,
+ LAUNCH_ENGINE = Value
def getOperationType(from: TOperationType): OperationType = {
from match {
@@ -57,6 +58,8 @@ object OperationType extends Enumeration {
case GET_TABLE_TYPES => TOperationType.GET_TABLE_TYPES
case GET_COLUMNS => TOperationType.GET_COLUMNS
case GET_FUNCTIONS => TOperationType.GET_FUNCTIONS
+ // LAUNCH_ENGINE is not an OperationType defined in Hive thrift protocol
+ case LAUNCH_ENGINE => TOperationType.UNKNOWN
case other =>
throw new UnsupportedOperationException(s"Unsupported Operation type:
${other.toString}")
}
diff --git
a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/HiveJDBCTestHelper.scala
b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/HiveJDBCTestHelper.scala
index c21392a..44bdb65 100644
---
a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/HiveJDBCTestHelper.scala
+++
b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/HiveJDBCTestHelper.scala
@@ -20,6 +20,8 @@ package org.apache.kyuubi.operation
import java.sql.{DriverManager, ResultSet, SQLException, Statement}
import java.util.Locale
+import scala.collection.JavaConverters._
+
import org.apache.hive.service.rpc.thrift._
import org.apache.hive.service.rpc.thrift.TCLIService.Iface
import org.apache.hive.service.rpc.thrift.TOperationState._
@@ -154,6 +156,7 @@ trait HiveJDBCTestHelper extends KyuubiFunSuite {
val req = new TOpenSessionReq()
req.setUsername(user)
req.setPassword("anonymous")
+ req.setConfiguration(_sessionConfigs.asJava)
val resp = client.OpenSession(req)
val handle = resp.getSessionHandle
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiSessionEvent.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiSessionEvent.scala
index 44a9517..60f2286 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiSessionEvent.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiSessionEvent.scala
@@ -24,6 +24,7 @@ import org.apache.kyuubi.session.KyuubiSessionImpl
/**
* @param sessionId server session id
+ * @param remoteSessionId remote engine session id
* @param sessionName if user not specify it, we use empty string instead
* @param user session user
* @param clientIP client ip address
@@ -44,6 +45,7 @@ case class KyuubiSessionEvent(
conf: Map[String, String],
startTime: Long,
var sessionId: String = "",
+ var remoteSessionId: String = "",
var clientVersion: Int = -1,
var openedTime: Long = -1L,
var endTime: Long = -1L,
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
index 643080a..82a14f3 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
@@ -24,7 +24,6 @@ import org.apache.hive.service.rpc.thrift.TOperationState._
import org.apache.thrift.TException
import org.apache.kyuubi.KyuubiSQLException
-import org.apache.kyuubi.client.KyuubiSyncThriftClient
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.events.KyuubiStatementEvent
import org.apache.kyuubi.metrics.MetricsConstants._
@@ -37,11 +36,10 @@ import org.apache.kyuubi.session.{KyuubiSessionImpl,
KyuubiSessionManager, Sessi
class ExecuteStatement(
session: Session,
- client: KyuubiSyncThriftClient,
override val statement: String,
override val shouldRunAsync: Boolean,
queryTimeout: Long)
- extends KyuubiOperation(OperationType.EXECUTE_STATEMENT, session, client) {
+ extends KyuubiOperation(OperationType.EXECUTE_STATEMENT, session) {
EventLoggingService.onEvent(KyuubiStatementEvent(this))
private final val _operationLog: OperationLog = if (shouldRunAsync) {
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/GetCatalogs.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/GetCatalogs.scala
index f46dc7a..7553373 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/GetCatalogs.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/GetCatalogs.scala
@@ -17,11 +17,9 @@
package org.apache.kyuubi.operation
-import org.apache.kyuubi.client.KyuubiSyncThriftClient
import org.apache.kyuubi.session.Session
-class GetCatalogs(session: Session, client: KyuubiSyncThriftClient)
- extends KyuubiOperation(OperationType.GET_CATALOGS, session, client) {
+class GetCatalogs(session: Session) extends
KyuubiOperation(OperationType.GET_CATALOGS, session) {
override protected def runInternal(): Unit = {
try {
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/GetColumns.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/GetColumns.scala
index f732029..705372c 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/GetColumns.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/GetColumns.scala
@@ -17,17 +17,15 @@
package org.apache.kyuubi.operation
-import org.apache.kyuubi.client.KyuubiSyncThriftClient
import org.apache.kyuubi.session.Session
class GetColumns(
session: Session,
- client: KyuubiSyncThriftClient,
catalogName: String,
schemaName: String,
tableName: String,
columnName: String)
- extends KyuubiOperation(OperationType.GET_COLUMNS, session, client) {
+ extends KyuubiOperation(OperationType.GET_COLUMNS, session) {
override protected def runInternal(): Unit = {
try {
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/GetFunctions.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/GetFunctions.scala
index d8c6bcb..ecc475c 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/GetFunctions.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/GetFunctions.scala
@@ -17,16 +17,14 @@
package org.apache.kyuubi.operation
-import org.apache.kyuubi.client.KyuubiSyncThriftClient
import org.apache.kyuubi.session.Session
class GetFunctions(
session: Session,
- client: KyuubiSyncThriftClient,
catalogName: String,
schemaName: String,
functionName: String)
- extends KyuubiOperation(OperationType.GET_FUNCTIONS, session, client) {
+ extends KyuubiOperation(OperationType.GET_FUNCTIONS, session) {
override protected def runInternal(): Unit = {
try {
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/GetSchemas.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/GetSchemas.scala
index 236e801..b6d3d54 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/GetSchemas.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/GetSchemas.scala
@@ -17,15 +17,13 @@
package org.apache.kyuubi.operation
-import org.apache.kyuubi.client.KyuubiSyncThriftClient
import org.apache.kyuubi.session.Session
class GetSchemas(
session: Session,
- client: KyuubiSyncThriftClient,
catalogName: String,
schemaName: String)
- extends KyuubiOperation(OperationType.GET_SCHEMAS, session, client) {
+ extends KyuubiOperation(OperationType.GET_SCHEMAS, session) {
override protected def runInternal(): Unit = {
try {
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/GetTableTypes.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/GetTableTypes.scala
index 0eea8da..8ed42e7 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/GetTableTypes.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/GetTableTypes.scala
@@ -17,13 +17,10 @@
package org.apache.kyuubi.operation
-import org.apache.kyuubi.client.KyuubiSyncThriftClient
import org.apache.kyuubi.session.Session
-class GetTableTypes(
- session: Session,
- client: KyuubiSyncThriftClient)
- extends KyuubiOperation(OperationType.GET_TABLE_TYPES, session, client) {
+class GetTableTypes(session: Session) extends
+ KyuubiOperation(OperationType.GET_TABLE_TYPES, session) {
override protected def runInternal(): Unit = {
try {
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/GetTables.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/GetTables.scala
index 4400f4d..697c7d6 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/GetTables.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/GetTables.scala
@@ -17,17 +17,15 @@
package org.apache.kyuubi.operation
-import org.apache.kyuubi.client.KyuubiSyncThriftClient
import org.apache.kyuubi.session.Session
class GetTables(
session: Session,
- client: KyuubiSyncThriftClient,
catalogName: String,
schemaName: String,
tableName: String,
tableTypes: java.util.List[String])
- extends KyuubiOperation(OperationType.GET_TABLES, session, client) {
+ extends KyuubiOperation(OperationType.GET_TABLES, session) {
override protected def runInternal(): Unit = {
try {
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/GetTypeInfo.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/GetTypeInfo.scala
index c98a238..4a1b605 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/GetTypeInfo.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/GetTypeInfo.scala
@@ -17,11 +17,9 @@
package org.apache.kyuubi.operation
-import org.apache.kyuubi.client.KyuubiSyncThriftClient
import org.apache.kyuubi.session.Session
-class GetTypeInfo(session: Session, client: KyuubiSyncThriftClient)
- extends KyuubiOperation(OperationType.GET_TYPE_INFO, session, client) {
+class GetTypeInfo(session: Session) extends
KyuubiOperation(OperationType.GET_TYPE_INFO, session) {
override protected def runInternal(): Unit = {
try {
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
index a69814d..83922c4 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
@@ -26,18 +26,17 @@ import org.apache.thrift.TException
import org.apache.thrift.transport.TTransportException
import org.apache.kyuubi.{KyuubiSQLException, Utils}
-import org.apache.kyuubi.client.KyuubiSyncThriftClient
import org.apache.kyuubi.metrics.MetricsConstants.STATEMENT_FAIL
import org.apache.kyuubi.metrics.MetricsSystem
import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
import org.apache.kyuubi.operation.OperationType.OperationType
-import org.apache.kyuubi.session.Session
+import org.apache.kyuubi.session.{KyuubiSessionImpl, Session}
import org.apache.kyuubi.util.ThriftUtils
-abstract class KyuubiOperation(
- opType: OperationType,
- session: Session,
- client: KyuubiSyncThriftClient) extends AbstractOperation(opType, session)
{
+abstract class KyuubiOperation(opType: OperationType, session: Session)
+ extends AbstractOperation(opType, session) {
+
+ protected[operation] lazy val client =
session.asInstanceOf[KyuubiSessionImpl].client
@volatile protected var _remoteOpHandle: TOperationHandle = _
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
index 35d7076..4af1c35 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
@@ -17,24 +17,20 @@
package org.apache.kyuubi.operation
-import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
+import java.util.concurrent.TimeUnit
import org.apache.hive.service.rpc.thrift.TRowSet
-import org.apache.kyuubi.KyuubiSQLException
-import org.apache.kyuubi.client.KyuubiSyncThriftClient
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.OPERATION_QUERY_TIMEOUT
import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
-import org.apache.kyuubi.session.{Session, SessionHandle}
+import org.apache.kyuubi.session.{KyuubiSessionImpl, Session}
import org.apache.kyuubi.util.ThriftUtils
class KyuubiOperationManager private (name: String) extends
OperationManager(name) {
def this() = this(classOf[KyuubiOperationManager].getSimpleName)
- private val handleToClient = new ConcurrentHashMap[SessionHandle,
KyuubiSyncThriftClient]()
-
private var queryTimeout: Option[Long] = None
override def initialize(conf: KyuubiConf): Unit = {
@@ -42,14 +38,6 @@ class KyuubiOperationManager private (name: String) extends
OperationManager(nam
super.initialize(conf)
}
- private def getThriftClient(sessionHandle: SessionHandle):
KyuubiSyncThriftClient = {
- val client = handleToClient.get(sessionHandle)
- if (client == null) {
- throw KyuubiSQLException(s"$sessionHandle has not been initialized or
already been closed")
- }
- client
- }
-
private def getQueryTimeout(clientQueryTimeout: Long): Long = {
// If clientQueryTimeout is smaller than systemQueryTimeout value,
// we use the clientQueryTimeout value.
@@ -61,34 +49,23 @@ class KyuubiOperationManager private (name: String) extends
OperationManager(nam
}
}
- def setConnection(sessionHandle: SessionHandle, client:
KyuubiSyncThriftClient): Unit = {
- handleToClient.put(sessionHandle, client)
- }
-
- def removeConnection(sessionHandle: SessionHandle): Unit = {
- handleToClient.remove(sessionHandle)
- }
-
override def newExecuteStatementOperation(
session: Session,
statement: String,
runAsync: Boolean,
queryTimeout: Long): Operation = {
- val client = getThriftClient(session.handle)
- val operation = new ExecuteStatement(session, client, statement, runAsync,
+ val operation = new ExecuteStatement(session, statement, runAsync,
getQueryTimeout(queryTimeout))
addOperation(operation)
}
override def newGetTypeInfoOperation(session: Session): Operation = {
- val client = getThriftClient(session.handle)
- val operation = new GetTypeInfo(session, client)
+ val operation = new GetTypeInfo(session)
addOperation(operation)
}
override def newGetCatalogsOperation(session: Session): Operation = {
- val client = getThriftClient(session.handle)
- val operation = new GetCatalogs(session, client)
+ val operation = new GetCatalogs(session)
addOperation(operation)
}
@@ -96,8 +73,7 @@ class KyuubiOperationManager private (name: String) extends
OperationManager(nam
session: Session,
catalog: String,
schema: String): Operation = {
- val client = getThriftClient(session.handle)
- val operation = new GetSchemas(session, client, catalog, schema)
+ val operation = new GetSchemas(session, catalog, schema)
addOperation(operation)
}
@@ -107,15 +83,12 @@ class KyuubiOperationManager private (name: String)
extends OperationManager(nam
schemaName: String,
tableName: String,
tableTypes: java.util.List[String]): Operation = {
- val client = getThriftClient(session.handle)
- val operation = new GetTables(
- session, client, catalogName, schemaName, tableName, tableTypes)
+ val operation = new GetTables(session, catalogName, schemaName, tableName,
tableTypes)
addOperation(operation)
}
override def newGetTableTypesOperation(session: Session): Operation = {
- val client = getThriftClient(session.handle)
- val operation = new GetTableTypes(session, client)
+ val operation = new GetTableTypes(session)
addOperation(operation)
}
@@ -125,8 +98,7 @@ class KyuubiOperationManager private (name: String) extends
OperationManager(nam
schemaName: String,
tableName: String,
columnName: String): Operation = {
- val client = getThriftClient(session.handle)
- val operation = new GetColumns(session, client, catalogName, schemaName,
tableName, columnName)
+ val operation = new GetColumns(session, catalogName, schemaName,
tableName, columnName)
addOperation(operation)
}
@@ -135,8 +107,12 @@ class KyuubiOperationManager private (name: String)
extends OperationManager(nam
catalogName: String,
schemaName: String,
functionName: String): Operation = {
- val client = getThriftClient(session.handle)
- val operation = new GetFunctions(session, client, catalogName, schemaName,
functionName)
+ val operation = new GetFunctions(session, catalogName, schemaName,
functionName)
+ addOperation(operation)
+ }
+
+ def newLaunchEngineOperation(session: KyuubiSessionImpl, shouldRunAsync:
Boolean): Operation = {
+ val operation = new LaunchEngine(session, shouldRunAsync)
addOperation(operation)
}
@@ -150,7 +126,7 @@ class KyuubiOperationManager private (name: String) extends
OperationManager(nam
case Some(log) => log.read(maxRows)
case None =>
val remoteHandle = operation.remoteOpHandle()
- val client = getThriftClient(operation.getSession.handle)
+ val client = operation.client
if (remoteHandle != null) {
client.fetchResults(remoteHandle, order, maxRows, fetchLog = true)
} else {
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/LaunchEngine.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/LaunchEngine.scala
new file mode 100644
index 0000000..9b89481
--- /dev/null
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/LaunchEngine.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.operation
+
+import org.apache.kyuubi.operation.log.OperationLog
+import org.apache.kyuubi.session.KyuubiSessionImpl
+
+class LaunchEngine(session: KyuubiSessionImpl, override val shouldRunAsync:
Boolean) extends
+ KyuubiOperation(OperationType.LAUNCH_ENGINE, session) {
+
+ private lazy val _operationLog: OperationLog = if (shouldRunAsync) {
+ OperationLog.createOperationLog(session, getHandle)
+ } else {
+ // when launch engine synchronously, operation log is not needed
+ null
+ }
+ override def getOperationLog: Option[OperationLog] = Option(_operationLog)
+
+ override protected def beforeRun(): Unit = {
+ OperationLog.setCurrentOperationLog(_operationLog)
+ setHasResultSet(false)
+ setState(OperationState.PENDING)
+ }
+
+ override protected def afterRun(): Unit = {
+ OperationLog.removeCurrentOperationLog()
+ }
+
+ override protected def runInternal(): Unit = {
+ val asyncOperation: Runnable = () => {
+ setState(OperationState.RUNNING)
+ try {
+ session.openEngineSession()
+ setState(OperationState.FINISHED)
+ } catch {
+ onError()
+ } finally {
+ // TODO: delay to close it for async mode to enable client to get more
launch engine log
+ session.closeOperation(getHandle)
+ }
+ }
+ try {
+ val opHandle =
session.sessionManager.submitBackgroundOperation(asyncOperation)
+ setBackgroundHandle(opHandle)
+ } catch onError("submitting open engine operation in background, request
rejected")
+
+ if (!shouldRunAsync) getBackgroundHandle.get()
+ }
+}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
index d847b0c..469f06c 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
@@ -42,7 +42,7 @@ class KyuubiSessionImpl(
password: String,
ipAddress: String,
conf: Map[String, String],
- sessionManager: KyuubiSessionManager,
+ override val sessionManager: KyuubiSessionManager,
sessionConf: KyuubiConf)
extends AbstractSession(protocol, user, password, ipAddress, conf,
sessionManager) {
@@ -54,62 +54,94 @@ class KyuubiSessionImpl(
}
val engine: EngineRef = new EngineRef(sessionConf, user)
+ val launchEngineAsync = sessionConf.get(SESSION_ENGINE_LAUNCH_ASYNC)
+ private val launchEngineOp =
sessionManager.operationManager.newLaunchEngineOperation(
+ this, launchEngineAsync)
+ @volatile
+ var engineLaunched: Boolean = false
private val sessionEvent = KyuubiSessionEvent(this)
EventLoggingService.onEvent(sessionEvent)
private var transport: TTransport = _
- private var client: KyuubiSyncThriftClient = _
+ private var _client: KyuubiSyncThriftClient = _
+ def client: KyuubiSyncThriftClient = _client
- private var _handle: SessionHandle = _
- override def handle: SessionHandle = _handle
+ override val handle: SessionHandle = SessionHandle(protocol)
+ private var _engineSessionHandle: SessionHandle = _
override def open(): Unit = {
MetricsSystem.tracing { ms =>
ms.incCount(CONN_TOTAL)
ms.incCount(MetricRegistry.name(CONN_OPEN, user))
}
- withZkClient(sessionConf) { zkClient =>
- val (host, port) = engine.getOrCreate(zkClient)
- openSession(host, port)
- }
- // we should call super.open after kyuubi session is already opened
+
+ // we should call super.open before running launch engine operation
super.open()
+
+ runOperation(launchEngineOp)
}
- private def openSession(host: String, port: Int): Unit = {
- val passwd = Option(password).filter(_.nonEmpty).getOrElse("anonymous")
- val loginTimeout = sessionConf.get(ENGINE_LOGIN_TIMEOUT).toInt
- val requestTimeout = sessionConf.get(ENGINE_REQUEST_TIMEOUT).toInt
- transport = PlainSASLHelper.getPlainTransport(
- user, passwd, new TSocket(host, port, requestTimeout, loginTimeout))
- if (!transport.isOpen) {
- transport.open()
- logSessionInfo(s"Connected to engine [$host:$port]")
+ private[kyuubi] def openEngineSession(): Unit = {
+ withZkClient(sessionConf) { zkClient =>
+ val (host, port) = engine.getOrCreate(zkClient)
+ val passwd = Option(password).filter(_.nonEmpty).getOrElse("anonymous")
+ val loginTimeout = sessionConf.get(ENGINE_LOGIN_TIMEOUT).toInt
+ val requestTimeout = sessionConf.get(ENGINE_REQUEST_TIMEOUT).toInt
+ transport = PlainSASLHelper.getPlainTransport(
+ user, passwd, new TSocket(host, port, requestTimeout, loginTimeout))
+ if (!transport.isOpen) {
+ transport.open()
+ logSessionInfo(s"Connected to engine [$host:$port]")
+ }
+ _client = new KyuubiSyncThriftClient(new TBinaryProtocol(transport))
+ _engineSessionHandle = _client.openSession(protocol, user, passwd,
normalizedConf)
+ logSessionInfo(s"Opened engine session[${_engineSessionHandle}]")
+ sessionEvent.openedTime = System.currentTimeMillis()
+ sessionEvent.sessionId = handle.identifier.toString
+ sessionEvent.remoteSessionId = _engineSessionHandle.identifier.toString
+ sessionEvent.clientVersion = handle.protocol.getValue
+ EventLoggingService.onEvent(sessionEvent)
}
- client = new KyuubiSyncThriftClient(new TBinaryProtocol(transport))
- // use engine SessionHandle directly
- _handle = client.openSession(protocol, user, passwd, normalizedConf)
- sessionManager.operationManager.setConnection(handle, client)
- sessionEvent.openedTime = System.currentTimeMillis()
- sessionEvent.sessionId = handle.identifier.toString
- sessionEvent.clientVersion = handle.protocol.getValue
- EventLoggingService.onEvent(sessionEvent)
}
override protected def runOperation(operation: Operation): OperationHandle =
{
- sessionEvent.totalOperations += 1
+ if (operation != launchEngineOp) {
+ waitForEngineLaunched()
+ sessionEvent.totalOperations += 1
+ }
super.runOperation(operation)
}
+ private def waitForEngineLaunched(): Unit = {
+ if (!engineLaunched) {
+ Option(launchEngineOp).foreach { op =>
+ val waitingStartTime = System.currentTimeMillis()
+ logSessionInfo(s"Starting to wait the launch engine operation
finished")
+
+ op.getBackgroundHandle.get()
+
+ val elapsedTime = System.currentTimeMillis() - waitingStartTime
+ logSessionInfo(s"Engine has been launched, elapsed time: ${elapsedTime
/ 1000} s")
+
+ if (_engineSessionHandle == null) {
+ val ex = op.getStatus.exception.getOrElse(
+ KyuubiSQLException(s"Failed to launch engine for
session[$handle]"))
+ throw ex
+ }
+
+ engineLaunched = true
+ }
+ }
+ }
+
override def close(): Unit = {
super.close()
if (handle != null) {
- sessionManager.operationManager.removeConnection(handle)
sessionManager.credentialsManager.removeSessionCredentialsEpoch(handle.identifier.toString)
}
try {
- if (client != null) client.closeSession()
+ if (_client != null) _client.closeSession()
} catch {
case e: TException =>
throw KyuubiSQLException("Error while cleaning up the engine
resources", e)
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/events/EventLoggingServiceSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/events/EventLoggingServiceSuite.scala
index 7739861..b2fb9a7 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/events/EventLoggingServiceSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/events/EventLoggingServiceSuite.scala
@@ -107,11 +107,13 @@ class EventLoggingServiceSuite extends WithKyuubiServer
with HiveJDBCTestHelper
assert(res.getString("user") == Utils.currentUser)
assert(res.getString("sessionName") == "test1")
assert(res.getString("sessionId") == "")
+ assert(res.getString("remoteSessionId") == "")
assert(res.getLong("startTime") > 0)
assert(res.getInt("totalOperations") == 0)
assert(res.next())
assert(res.getInt("totalOperations") == 0)
assert(res.getString("sessionId") != "")
+ assert(res.getString("remoteSessionId") != "")
assert(res.getLong("openedTime") > 0)
assert(res.next())
assert(res.getInt("totalOperations") == 1)
@@ -121,7 +123,7 @@ class EventLoggingServiceSuite extends WithKyuubiServer
with HiveJDBCTestHelper
}
}
- test("engine session id should be same with server session id") {
+ test("engine session id is not same with server session id") {
val name = UUID.randomUUID().toString
withSessionConf()(Map.empty)(Map(KyuubiConf.SESSION_NAME.key -> name)) {
withJdbcStatement() { statement =>
@@ -145,7 +147,7 @@ class EventLoggingServiceSuite extends WithKyuubiServer
with HiveJDBCTestHelper
val res2 = statement.executeQuery(
s"SELECT * FROM `json`.`$engineSessionEventPath` " +
s"where sessionId = '$serverSessionId' limit 1")
- assert(res2.next())
+ assert(!res2.next())
}
}
}
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala
index 0a16da9..af3062d 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala
@@ -97,4 +97,39 @@ class KyuubiOperationPerConnectionSuite extends
WithKyuubiServer with HiveJDBCTe
"Caused by: java.net.SocketException: Broken pipe (Write failed)")
}
}
+
+ test("test asynchronous open kyuubi session") {
+ withSessionConf(Map(
+ KyuubiConf.SESSION_ENGINE_LAUNCH_ASYNC.key -> "true"
+ ))(Map.empty)(Map.empty) {
+ withSessionHandle { (client, handle) =>
+ val executeStmtReq = new TExecuteStatementReq()
+ executeStmtReq.setStatement("select engine_name()")
+ executeStmtReq.setSessionHandle(handle)
+ executeStmtReq.setRunAsync(false)
+ val executeStmtResp = client.ExecuteStatement(executeStmtReq)
+ val getOpStatusReq = new
TGetOperationStatusReq(executeStmtResp.getOperationHandle)
+ val getOpStatusResp = client.GetOperationStatus(getOpStatusReq)
+ assert(getOpStatusResp.getStatus.getStatusCode ===
TStatusCode.SUCCESS_STATUS)
+ assert(getOpStatusResp.getOperationState ===
TOperationState.FINISHED_STATE)
+ }
+ }
+ }
+
+ test("test asynchronous open kyuubi session failure") {
+ withSessionConf(Map(
+ KyuubiConf.SESSION_ENGINE_LAUNCH_ASYNC.key -> "true",
+ "spark.master" -> "invalid"
+ ))(Map.empty)(Map.empty) {
+ withSessionHandle { (client, handle) =>
+ val executeStmtReq = new TExecuteStatementReq()
+ executeStmtReq.setStatement("select engine_name()")
+ executeStmtReq.setSessionHandle(handle)
+ executeStmtReq.setRunAsync(false)
+ val executeStmtResp = client.ExecuteStatement(executeStmtReq)
+ assert(executeStmtResp.getStatus.getStatusCode ==
TStatusCode.ERROR_STATUS)
+
assert(executeStmtResp.getStatus.getErrorMessage.contains("kyuubi-spark-sql-engine.log"))
+ }
+ }
+ }
}
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerUserSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerUserSuite.scala
index a6aa3a3..ee31b27 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerUserSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerUserSuite.scala
@@ -65,6 +65,24 @@ class KyuubiOperationPerUserSuite extends WithKyuubiServer
with SparkQueryTests
assert(r1 === r2)
}
+ test("ensure open session asynchronously for USER mode still share the same
engine") {
+ withJdbcStatement() { statement =>
+ val resultSet = statement.executeQuery("SELECT engine_id()")
+ assert(resultSet.next())
+ val engineId = resultSet.getString(1)
+
+ withSessionConf(Map(
+ KyuubiConf.SESSION_ENGINE_LAUNCH_ASYNC.key -> "true"
+ ))(Map.empty)(Map.empty) {
+ withJdbcStatement() { stmt =>
+ val rs = stmt.executeQuery("SELECT engine_id()")
+ assert(rs.next())
+ assert(rs.getString(1) == engineId)
+ }
+ }
+ }
+ }
+
test("ensure two connections share the same engine when specifying
subDomain.") {
withSessionConf()(
Map(