This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 5421b56 [KYUUBI #1327] Fix socket timeout error when client sync
execute statement cost time longer than engine.login.timeout
5421b56 is described below
commit 5421b56440cdd0d07de0a98d5a3dc0cfb9ef312e
Author: Cheng Pan <[email protected]>
AuthorDate: Thu Nov 4 18:23:10 2021 +0800
[KYUUBI #1327] Fix socket timeout error when client sync execute statement
cost time longer than engine.login.timeout
### _Why are the changes needed?_
Fix #1327
#### Analysis
The typical error logs are
```log
2021-11-01 11:27:54.018 INFO client.KyuubiSyncThriftClient:
TCloseOperationReq(operationHandle:TOperationHandle(operationId:THandleIdentifier(guid:47
69 37 B3 13 38 48 DA 87 7A 8A B6 BD 22 FA 57, secret:C1 01 AE 0B 6F 5F 48 F1
9A F0 FD 84 E3 0F 2B 1E), operationType:EXECUTE_STATEMENT, hasResultSet:true))
succeed on engine side
2021-11-01 11:27:54.019 INFO operation.ExecuteStatement: Processing
sy.yao's query[c581db45-67f2-4f15-ac0d-aaecdb713fe9]: INITIALIZED_STATE ->
PENDING_STATE, statement: [...SQL]
2021-11-01 11:27:54.019 INFO operation.ExecuteStatement: Processing
sy.yao's query[c581db45-67f2-4f15-ac0d-aaecdb713fe9]: PENDING_STATE ->
RUNNING_STATE, statement: [...SQL]
2021-11-01 11:28:09.034 INFO operation.ExecuteStatement: Processing
sy.yao's query[c581db45-67f2-4f15-ac0d-aaecdb713fe9]: RUNNING_STATE ->
ERROR_STATE, statement: [...SQL], time taken: 15.015 seconds
2021-11-01 11:28:09.035 INFO operation.ExecuteStatement: Processing
sy.yao's query[c581db45-67f2-4f15-ac0d-aaecdb713fe9]: ERROR_STATE ->
CLOSED_STATE, statement: [...SQL]
2021-11-01 11:28:09.035 WARN server.KyuubiThriftBinaryFrontendService:
Error executing statement:
org.apache.kyuubi.KyuubiSQLException: Error operating EXECUTE_STATEMENT:
org.apache.thrift.transport.TTransportException:
java.net.SocketTimeoutException: Read timed out
[...omit detail stacktrace here]
```
The key points here are:
1. client execute query in sync mode, Hive JDBC client use async mode since
2.1.0 [HIVE-6535](https://issues.apache.org/jira/browse/HIVE-6535)
2. query execute cost time great than
`kyuubi.session.engine.login.timeout`, which default is `15s`
Kyuubi server create Thrift clients use
`kyuubi.session.engine.login.timeout` as both `socket_timeout` and
`connect_timeout`, and there is no heartbeat/keepalive mechanism in Thrift
Protocol layer, thus if engine does not send response to Kyuubi server in
`socket_timeout`, the Thrift client(Kyuubi server) will fail with
`SocketTimeoutException: Read timed out`.
In sync mode, when user send a execute statement request to Kyuubi server,
Kyuubi server forword the request to the engine, engine return nothing until
the execution finished or any other error happend.
In async mode, the query request passes in same way, but engine will return
an `op_handle` immediately instead of waiting for query to finish, the client
should use the `op_handle` to check query status periodically, in detail
1. Kyuubi server ask engine for query status, and keep the result in memory
2. User ask Kyuubi server for query status, Kyuubi server get result from
memory
#### Solution
Option 1: change `socket_timeout` of Thrift clients created by Kyuubi
server to infinite. This approach may cause another issue. if engine crash when
executing the query, Kyuubi server will wait until the socket
keepalive(controlled by OS) timeout, so the query status will always be
`running` in Kyuubi server memory.
Option 2: always run a query in async mode, simulate sync mode in Server
side.
This PR implement the option 2, also introduce a new conf
`kyuubi.session.engine.request.timeout`
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run
test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #1328 from pan3793/timeout.
Closes #1327
cc15e6d5 [Cheng Pan] Always execute statement in async mode
dc91d0d5 [Cheng Pan] Add new conf `kyuubi.session.engine.request.timeout`
Authored-by: Cheng Pan <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
docs/deployment/settings.md | 3 +-
.../org/apache/kyuubi/config/KyuubiConf.scala | 8 ++++-
.../apache/kyuubi/operation/ExecuteStatement.scala | 36 ++++++++++------------
.../apache/kyuubi/session/KyuubiSessionImpl.scala | 3 +-
.../KyuubiOperationPerConnectionSuite.scala | 34 +++++++++++++++++++-
5 files changed, 60 insertions(+), 24 deletions(-)
diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md
index d0e2bcb..77a81ac 100644
--- a/docs/deployment/settings.md
+++ b/docs/deployment/settings.md
@@ -289,7 +289,8 @@ kyuubi\.session\.engine<br>\.check\.interval|<div
style='width: 65pt;word-wrap:
kyuubi\.session\.engine<br>\.idle\.timeout|<div style='width: 65pt;word-wrap:
break-word;white-space: normal'>PT30M</div>|<div style='width: 170pt;word-wrap:
break-word;white-space: normal'>engine timeout, the engine will self-terminate
when it's not accessed for this duration</div>|<div style='width:
30pt'>duration</div>|<div style='width: 20pt'>1.0.0</div>
kyuubi\.session\.engine<br>\.initialize\.timeout|<div style='width:
65pt;word-wrap: break-word;white-space: normal'>PT3M</div>|<div style='width:
170pt;word-wrap: break-word;white-space: normal'>Timeout for starting the
background engine, e.g. SparkSQLEngine.</div>|<div style='width:
30pt'>duration</div>|<div style='width: 20pt'>1.0.0</div>
kyuubi\.session\.engine<br>\.log\.timeout|<div style='width: 65pt;word-wrap:
break-word;white-space: normal'>PT24H</div>|<div style='width: 170pt;word-wrap:
break-word;white-space: normal'>If we use Spark as the engine then the session
submit log is the console output of spark-submit. We will retain the session
submit log until over the config value.</div>|<div style='width:
30pt'>duration</div>|<div style='width: 20pt'>1.1.0</div>
-kyuubi\.session\.engine<br>\.login\.timeout|<div style='width: 65pt;word-wrap:
break-word;white-space: normal'>PT15S</div>|<div style='width: 170pt;word-wrap:
break-word;white-space: normal'>The timeout(ms) of creating the connection to
remote sql query engine</div>|<div style='width: 30pt'>duration</div>|<div
style='width: 20pt'>1.0.0</div>
+kyuubi\.session\.engine<br>\.login\.timeout|<div style='width: 65pt;word-wrap:
break-word;white-space: normal'>PT15S</div>|<div style='width: 170pt;word-wrap:
break-word;white-space: normal'>The timeout of creating the connection to
remote sql query engine</div>|<div style='width: 30pt'>duration</div>|<div
style='width: 20pt'>1.0.0</div>
+kyuubi\.session\.engine<br>\.request\.timeout|<div style='width:
65pt;word-wrap: break-word;white-space: normal'>PT1M</div>|<div style='width:
170pt;word-wrap: break-word;white-space: normal'>The timeout of awaiting
response after sending request to remote sql query engine</div>|<div
style='width: 30pt'>duration</div>|<div style='width: 20pt'>1.4.0</div>
kyuubi\.session\.engine<br>\.share\.level|<div style='width: 65pt;word-wrap:
break-word;white-space: normal'>USER</div>|<div style='width: 170pt;word-wrap:
break-word;white-space: normal'>(deprecated) - Using kyuubi.engine.share.level
instead</div>|<div style='width: 30pt'>string</div>|<div style='width:
20pt'>1.0.0</div>
kyuubi\.session\.engine<br>\.spark\.main\.resource|<div style='width:
65pt;word-wrap: break-word;white-space: normal'><undefined></div>|<div
style='width: 170pt;word-wrap: break-word;white-space: normal'>The package used
to create Spark SQL engine remote application. If it is undefined, Kyuubi will
use the default</div>|<div style='width: 30pt'>string</div>|<div style='width:
20pt'>1.0.0</div>
kyuubi\.session\.engine<br>\.startup\.error\.max<br>\.size|<div style='width:
65pt;word-wrap: break-word;white-space: normal'>8192</div>|<div style='width:
170pt;word-wrap: break-word;white-space: normal'>During engine bootstrapping,
if error occurs, using this config to limit the length error
message(characters).</div>|<div style='width: 30pt'>int</div>|<div
style='width: 20pt'>1.1.0</div>
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index ffdb7af..4735369 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -509,11 +509,17 @@ object KyuubiConf {
.createOptional
val ENGINE_LOGIN_TIMEOUT: ConfigEntry[Long] =
buildConf("session.engine.login.timeout")
- .doc("The timeout(ms) of creating the connection to remote sql query
engine")
+ .doc("The timeout of creating the connection to remote sql query engine")
.version("1.0.0")
.timeConf
.createWithDefault(Duration.ofSeconds(15).toMillis)
+ val ENGINE_REQUEST_TIMEOUT: ConfigEntry[Long] =
buildConf("session.engine.request.timeout")
+ .doc("The timeout of awaiting response after sending request to remote sql
query engine")
+ .version("1.4.0")
+ .timeConf
+ .createWithDefault(Duration.ofSeconds(60).toMillis)
+
val ENGINE_INIT_TIMEOUT: ConfigEntry[Long] =
buildConf("session.engine.initialize.timeout")
.doc("Timeout for starting the background engine, e.g. SparkSQLEngine.")
.version("1.0.0")
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
index e3fdb9b..6df933b 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
@@ -77,7 +77,10 @@ class ExecuteStatement(
ms.incCount(STATEMENT_OPEN)
ms.incCount(STATEMENT_TOTAL)
}
- _remoteOpHandle = client.executeStatement(statement, shouldRunAsync,
queryTimeout)
+ // We need to avoid executing query in sync mode, because there is no
heartbeat mechanism
+ // in thrift protocol, in sync mode, we cannot distinguish between
long-run query and
+ // engine crash without response before socket read timeout.
+ _remoteOpHandle = client.executeStatement(statement, true, queryTimeout)
} catch onError()
}
@@ -86,7 +89,7 @@ class ExecuteStatement(
var statusResp: TGetOperationStatusResp = null
var currentAttempts = 0
- def getOperationStatusWithRetry: Unit = {
+ def fetchOperationStatusWithRetry(): Unit = {
try {
statusResp = client.getOperationStatus(_remoteOpHandle)
currentAttempts = 0 // reset attempts whenever get touch with engine
again
@@ -104,7 +107,7 @@ class ExecuteStatement(
}
// initialize operation status
- while (statusResp == null) { getOperationStatusWithRetry }
+ while (statusResp == null) { fetchOperationStatusWithRetry() }
var isComplete = false
while (!isComplete) {
@@ -116,7 +119,7 @@ class ExecuteStatement(
remoteState match {
case INITIALIZED_STATE | PENDING_STATE | RUNNING_STATE =>
isComplete = false
- getOperationStatusWithRetry
+ fetchOperationStatusWithRetry()
case FINISHED_STATE =>
setState(OperationState.FINISHED)
@@ -164,22 +167,15 @@ class ExecuteStatement(
}
override protected def runInternal(): Unit = {
- if (shouldRunAsync) {
- executeStatement()
- val sessionManager = session.sessionManager
- val asyncOperation = new Runnable {
- override def run(): Unit = waitStatementComplete()
- }
- try {
- val backgroundOperation =
- sessionManager.submitBackgroundOperation(asyncOperation)
- setBackgroundHandle(backgroundOperation)
- } catch onError("submitting query in background, query rejected")
- } else {
- setState(OperationState.RUNNING)
- executeStatement()
- setState(OperationState.FINISHED)
- }
+ executeStatement()
+ val sessionManager = session.sessionManager
+ val asyncOperation: Runnable = () => waitStatementComplete()
+ try {
+ val opHandle = sessionManager.submitBackgroundOperation(asyncOperation)
+ setBackgroundHandle(opHandle)
+ } catch onError("submitting query in background, query rejected")
+
+ if (!shouldRunAsync) getBackgroundHandle.get()
}
override def setState(newState: OperationState): Unit = {
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
index cb8bf70..d847b0c 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
@@ -80,8 +80,9 @@ class KyuubiSessionImpl(
private def openSession(host: String, port: Int): Unit = {
val passwd = Option(password).filter(_.nonEmpty).getOrElse("anonymous")
val loginTimeout = sessionConf.get(ENGINE_LOGIN_TIMEOUT).toInt
+ val requestTimeout = sessionConf.get(ENGINE_REQUEST_TIMEOUT).toInt
transport = PlainSASLHelper.getPlainTransport(
- user, passwd, new TSocket(host, port, loginTimeout))
+ user, passwd, new TSocket(host, port, requestTimeout, loginTimeout))
if (!transport.isOpen) {
transport.open()
logSessionInfo(s"Connected to engine [$host:$port]")
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala
index e0011b1..0a16da9 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala
@@ -37,7 +37,7 @@ class KyuubiOperationPerConnectionSuite extends
WithKyuubiServer with HiveJDBCTe
KyuubiConf().set(KyuubiConf.ENGINE_SHARE_LEVEL, "connection")
}
- test("KYUUBI #647 - engine crash") {
+ test("KYUUBI #647 - async query causes engine crash") {
withSessionHandle { (client, handle) =>
val executeStmtReq = new TExecuteStatementReq()
executeStmtReq.setStatement("select java_method('java.lang.System',
'exit', 1)")
@@ -65,4 +65,36 @@ class KyuubiOperationPerConnectionSuite extends
WithKyuubiServer with HiveJDBCTe
assert(verboseMessage.contains("Failed to detect the root cause"))
}
}
+
+ test("client sync query cost time longer than engine.request.timeout") {
+ withSessionConf(Map(
+ KyuubiConf.ENGINE_REQUEST_TIMEOUT.key -> "PT5S"
+ ))(Map.empty)(Map.empty) {
+ withSessionHandle { (client, handle) =>
+ val executeStmtReq = new TExecuteStatementReq()
+ executeStmtReq.setStatement("select java_method('java.lang.Thread',
'sleep', 6000L)")
+ executeStmtReq.setSessionHandle(handle)
+ executeStmtReq.setRunAsync(false)
+ val executeStmtResp = client.ExecuteStatement(executeStmtReq)
+ val getOpStatusReq = new
TGetOperationStatusReq(executeStmtResp.getOperationHandle)
+ val getOpStatusResp = client.GetOperationStatus(getOpStatusReq)
+ assert(getOpStatusResp.getStatus.getStatusCode ===
TStatusCode.SUCCESS_STATUS)
+ assert(getOpStatusResp.getOperationState ===
TOperationState.FINISHED_STATE)
+ }
+ }
+ }
+
+ test("sync query causes engine crash") {
+ withSessionHandle { (client, handle) =>
+ val executeStmtReq = new TExecuteStatementReq()
+ executeStmtReq.setStatement("select java_method('java.lang.System',
'exit', 1)")
+ executeStmtReq.setSessionHandle(handle)
+ executeStmtReq.setRunAsync(false)
+ val executeStmtResp = client.ExecuteStatement(executeStmtReq)
+ assert(executeStmtResp.getStatus.getStatusCode ===
TStatusCode.ERROR_STATUS)
+ assert(executeStmtResp.getOperationHandle === null)
+ assert(executeStmtResp.getStatus.getErrorMessage contains
+ "Caused by: java.net.SocketException: Broken pipe (Write failed)")
+ }
+ }
}