This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 393d32a30 [KYUUBI #3615] Retry opening the engine when encountering a
special error
393d32a30 is described below
commit 393d32a30c8feeb16700299005fbfeb172ffaa85
Author: Fu Chen <[email protected]>
AuthorDate: Thu Oct 20 14:31:35 2022 +0800
[KYUUBI #3615] Retry opening the engine when encountering a special error
### _Why are the changes needed?_
to close #3615
steps to reproduce
1. start kyuubi server
2. connect to the kyuubi server with client beeline and run query `select 1`
3. force kill the spark application
4. connect to the kyuubi server with another client **A** immediately
```
Error: org.apache.kyuubi.KyuubiSQLException: Error operating LaunchEngine:
org.apache.thrift.transport.TTransportException: java.net.ConnectException:
Connection refused (Connection refused)
at org.apache.thrift.transport.TSocket.open(TSocket.java:226)
at
org.apache.thrift.transport.TSaslTransport.open(TSaslTransport.java:266)
at
org.apache.thrift.transport.TSaslClientTransport.open(TSaslClientTransport.java:37)
at
org.apache.kyuubi.client.KyuubiSyncThriftClient$.createTProtocol(KyuubiSyncThriftClient.scala:466)
at
org.apache.kyuubi.client.KyuubiSyncThriftClient$.createClient(KyuubiSyncThriftClient.scala:482)
at
org.apache.kyuubi.session.KyuubiSessionImpl.$anonfun$openEngineSession$1(KyuubiSessionImpl.scala:128)
at
org.apache.kyuubi.session.KyuubiSessionImpl.$anonfun$openEngineSession$1$adapted(KyuubiSessionImpl.scala:113)
at
org.apache.kyuubi.ha.client.DiscoveryClientProvider$.withDiscoveryClient(DiscoveryClientProvider.scala:36)
at
org.apache.kyuubi.session.KyuubiSessionImpl.openEngineSession(KyuubiSessionImpl.scala:113)
at
org.apache.kyuubi.operation.LaunchEngine.$anonfun$runInternal$2(LaunchEngine.scala:60)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.ConnectException: Connection refused (Connection
refused)
at java.net.PlainSocketImpl.socketConnect(Native Method)
at
java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at
java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at
java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)
at org.aache.thrift.transport.TSocket.open(TSocket.java:221)
... 14 more (state=,code=0)
```
If the engine exits without calling the shutdown hook, the ZNODE with
EPHEMERAL_SEQUENTIAL mode will be deleted upon the client's disconnect, and
note that the engine will be marked as disconnected after
`kyuubi.zookeeper.embedded.max.session.timeout` (the default value is 1 minutes)
This PR retries opening the engine when encountering a specialty exception
(java.net.ConnectException). Before this PR, client **A** will throw an
exception, after this PR, client **A** will work properly.
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run
test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #3618 from cfmcgrady/kyuubi-3615.
Closes #3615
96acd66b [Fu Chen] address comment
a1395a60 [Fu Chen] fix style
870216d1 [Fu Chen] Retry opening the engine when encountering a special
error
Authored-by: Fu Chen <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
docs/deployment/settings.md | 2 +
.../org/apache/kyuubi/config/KyuubiConf.scala | 14 +++++++
.../apache/kyuubi/session/KyuubiSessionImpl.scala | 45 +++++++++++++++-------
.../operation/KyuubiOperationPerUserSuite.scala | 34 ++++++++++++++++
4 files changed, 82 insertions(+), 13 deletions(-)
diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md
index 8c019d2b1..de61e18ee 100644
--- a/docs/deployment/settings.md
+++ b/docs/deployment/settings.md
@@ -457,6 +457,8 @@ kyuubi.session.engine.initialize.timeout|PT3M|Timeout for
starting the backgroun
kyuubi.session.engine.launch.async|true|When opening kyuubi session, whether
to launch backend engine asynchronously. When true, the Kyuubi server will set
up the connection with the client without delay as the backend engine will be
created asynchronously.|boolean|1.4.0
kyuubi.session.engine.log.timeout|PT24H|If we use Spark as the engine then the
session submit log is the console output of spark-submit. We will retain the
session submit log until over the config value.|duration|1.1.0
kyuubi.session.engine.login.timeout|PT15S|The timeout of creating the
connection to remote sql query engine|duration|1.0.0
+kyuubi.session.engine.open.max.attempts|9|The number of times an open engine
will retry when encountering a special error.|int|1.7.0
+kyuubi.session.engine.open.retry.wait|PT10S|How long to wait before retrying
to open engine after a failure.|duration|1.7.0
kyuubi.session.engine.share.level|USER|(deprecated) - Using
kyuubi.engine.share.level instead|string|1.0.0
kyuubi.session.engine.spark.main.resource|<undefined>|The package used
to create Spark SQL engine remote application. If it is undefined, Kyuubi will
use the default|string|1.0.0
kyuubi.session.engine.spark.max.lifetime|PT0S|Max lifetime for spark engine,
the engine will self-terminate when it reaches the end of life. 0 or negative
means not to self-terminate.|duration|1.6.0
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index 3779cbe84..93ef7d15d 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -955,6 +955,20 @@ object KyuubiConf {
.timeConf
.createWithDefault(Duration.ofSeconds(120).toMillis)
+ val ENGINE_OPEN_MAX_ATTEMPTS: ConfigEntry[Int] =
+ buildConf("kyuubi.session.engine.open.max.attempts")
+ .doc("The number of times an open engine will retry when encountering a
special error.")
+ .version("1.7.0")
+ .intConf
+ .createWithDefault(9)
+
+ val ENGINE_OPEN_RETRY_WAIT: ConfigEntry[Long] =
+ buildConf("kyuubi.session.engine.open.retry.wait")
+ .doc("How long to wait before retrying to open engine after a failure.")
+ .version("1.7.0")
+ .timeConf
+ .createWithDefault(Duration.ofSeconds(10).toMillis)
+
val ENGINE_INIT_TIMEOUT: ConfigEntry[Long] =
buildConf("kyuubi.session.engine.initialize.timeout")
.doc("Timeout for starting the background engine, e.g. SparkSQLEngine.")
.version("1.0.0")
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
index 2f96c20f5..c1e17deb3 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
@@ -117,26 +117,45 @@ class KyuubiSessionImpl(
openEngineSessionConf =
optimizedConf ++ Map(KYUUBI_ENGINE_CREDENTIALS_KEY ->
engineCredentials)
}
- val (host, port) = engine.getOrCreate(discoveryClient, extraEngineLog)
val passwd =
if (sessionManager.getConf.get(ENGINE_SECURITY_ENABLED)) {
InternalSecurityAccessor.get().issueToken()
} else {
Option(password).filter(_.nonEmpty).getOrElse("anonymous")
}
- try {
- _client = KyuubiSyncThriftClient.createClient(user, passwd, host,
port, sessionConf)
- _engineSessionHandle = _client.openSession(protocol, user, passwd,
openEngineSessionConf)
- } catch {
- case e: Throwable =>
- error(
- s"Opening engine [${engine.defaultEngineName} $host:$port]" +
- s" for $user session failed",
- e)
- throw e
+
+ val maxAttempts = sessionManager.getConf.get(ENGINE_OPEN_MAX_ATTEMPTS)
+ val retryWait = sessionManager.getConf.get(ENGINE_OPEN_RETRY_WAIT)
+ var attempt = 0
+ var shouldRetry = true
+ while (attempt <= maxAttempts && shouldRetry) {
+ val (host, port) = engine.getOrCreate(discoveryClient, extraEngineLog)
+ try {
+ _client = KyuubiSyncThriftClient.createClient(user, passwd, host,
port, sessionConf)
+ _engineSessionHandle = _client.openSession(protocol, user, passwd,
openEngineSessionConf)
+ logSessionInfo(s"Connected to engine
[$host:$port]/[${client.engineId.getOrElse("")}]" +
+ s" with ${_engineSessionHandle}]")
+ shouldRetry = false
+ } catch {
+ case e: org.apache.thrift.transport.TTransportException
+ if attempt < maxAttempts &&
e.getCause.isInstanceOf[java.net.ConnectException] &&
+ e.getCause.getMessage.contains("Connection refused (Connection
refused)") =>
+ warn(
+ s"Failed to open [${engine.defaultEngineName} $host:$port]
after" +
+ s" $attempt/$maxAttempts times, retrying",
+ e.getCause)
+ Thread.sleep(retryWait)
+ shouldRetry = true
+ case e: Throwable =>
+ error(
+ s"Opening engine [${engine.defaultEngineName} $host:$port]" +
+ s" for $user session failed",
+ e)
+ throw e
+ } finally {
+ attempt += 1
+ }
}
- logSessionInfo(s"Connected to engine
[$host:$port]/[${client.engineId.getOrElse("")}]" +
- s" with ${_engineSessionHandle}]")
sessionEvent.openedTime = System.currentTimeMillis()
sessionEvent.remoteSessionId = _engineSessionHandle.identifier.toString
_client.engineId.foreach(e => sessionEvent.engineId = e)
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerUserSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerUserSuite.scala
index 4cb3f7ef7..88830d80d 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerUserSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerUserSuite.scala
@@ -29,6 +29,7 @@ import
org.apache.kyuubi.config.KyuubiConf.KYUUBI_ENGINE_ENV_PREFIX
import org.apache.kyuubi.engine.SemanticVersion
import org.apache.kyuubi.jdbc.hive.KyuubiStatement
import org.apache.kyuubi.session.{KyuubiSessionImpl, KyuubiSessionManager,
SessionHandle}
+import org.apache.kyuubi.zookeeper.ZookeeperConf
class KyuubiOperationPerUserSuite
extends WithKyuubiServer with SparkQueryTests with WithSimpleDFSService {
@@ -280,4 +281,37 @@ class KyuubiOperationPerUserSuite
}
}
}
+
+ test("the new client should work properly when the engine exits
unexpectedly") {
+ assume(!httpMode)
+ withSessionConf(Map(
+ ZookeeperConf.ZK_MAX_SESSION_TIMEOUT.key -> "10000"))(Map.empty)(
+ Map.empty) {
+ withSessionHandle { (client, handle) =>
+ val preReq = new TExecuteStatementReq()
+ preReq.setStatement("SET kyuubi.operation.language=scala")
+ preReq.setSessionHandle(handle)
+ preReq.setRunAsync(false)
+ client.ExecuteStatement(preReq)
+
+ val exitReq = new TExecuteStatementReq()
+ // force kill engine without shutdown hook
+ exitReq.setStatement("java.lang.Runtime.getRuntime().halt(-1)")
+ exitReq.setSessionHandle(handle)
+ exitReq.setRunAsync(true)
+ client.ExecuteStatement(exitReq)
+ }
+ withSessionHandle { (client, handle) =>
+ val preReq = new TExecuteStatementReq()
+ preReq.setStatement("select engine_name()")
+ preReq.setSessionHandle(handle)
+ preReq.setRunAsync(false)
+ val tExecuteStatementResp = client.ExecuteStatement(preReq)
+ val opHandle = tExecuteStatementResp.getOperationHandle
+ waitForOperationToComplete(client, opHandle)
+ assert(tExecuteStatementResp.getStatus.getStatusCode ===
TStatusCode.SUCCESS_STATUS)
+ }
+ }
+ }
+
}