This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch branch-1.8
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/branch-1.8 by this push:
new f091be042 [KYUUBI #5900] Support to deregister the engine on open
failure
f091be042 is described below
commit f091be0424bcbcbe2858499767ebb22759737b47
Author: Fei Wang <[email protected]>
AuthorDate: Thu Dec 21 21:27:45 2023 -0800
[KYUUBI #5900] Support to deregister the engine on open failure
# :mag: Description
## Issue References ๐
This pull request fixes #5900
## Describe Your Solution ๐ง
Please include a summary of the change and which issue is fixed. Please
also include relevant motivation and context. List any dependencies that are
required for this change.
## Types of changes :bookmark:
- [ ] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing
functionality to change)
## Test Plan ๐งช
#### Behavior Without This Pull Request :coffin:
#### Behavior With This Pull Request :tada:
#### Related Unit Tests
---
# Checklists
## ๐ Author Self Checklist
- [ ] My code follows the [style
guidelines](https://kyuubi.readthedocs.io/en/master/contributing/code/style.html)
of this project
- [ ] I have performed a self-review
- [ ] I have commented my code, particularly in hard-to-understand areas
- [ ] I have made corresponding changes to the documentation
- [ ] My changes generate no new warnings
- [ ] I have added tests that prove my fix is effective or that my feature
works
- [ ] New and existing unit tests pass locally with my changes
- [ ] This patch was not authored or co-authored using [Generative
Tooling](https://www.apache.org/legal/generative-tooling.html)
## ๐ Committer Pre-Merge Checklist
- [ ] Pull request title is okay.
- [ ] No license issues.
- [ ] Milestone correctly set?
- [ ] Test coverage is ok
- [ ] Assignees are selected.
- [ ] Minimum number of approvals
- [ ] No changes are requested
**Be nice. Be informative.**
Closes #5901 from turboFei/delete_engine.
Closes #5900
57bcf1f24 [Fei Wang] docs
557714eed [Fei Wang] enum
bc479e6e0 [Fei Wang] save
2b2a4952c [Fei Wang] refine
f0ace3574 [Fei Wang] reset after close client
cf07877fe [Fei Wang] re-version
d93dd6add [Fei Wang] refactor
Authored-by: Fei Wang <[email protected]>
Signed-off-by: Fei Wang <[email protected]>
(cherry picked from commit fd397d6277990e76ab5c51567d951fd271b10c88)
Signed-off-by: Fei Wang <[email protected]>
---
docs/configuration/settings.md | 1 +
.../scala/org/apache/kyuubi/config/KyuubiConf.scala | 16 ++++++++++++++++
.../scala/org/apache/kyuubi/engine/EngineRef.scala | 13 +++++++++++++
.../apache/kyuubi/session/KyuubiSessionImpl.scala | 20 ++++++++++++++++++++
4 files changed, 50 insertions(+)
diff --git a/docs/configuration/settings.md b/docs/configuration/settings.md
index 522936289..bea16991f 100644
--- a/docs/configuration/settings.md
+++ b/docs/configuration/settings.md
@@ -439,6 +439,7 @@ You can configure the Kyuubi properties in
`$KYUUBI_HOME/conf/kyuubi-defaults.co
| kyuubi.session.engine.log.timeout | PT24H
| If we use Spark as the engine then the session submit log is the console
output of spark-submit. We will retain the session submit log until over the
config value.
[...]
| kyuubi.session.engine.login.timeout | PT15S
| The timeout of creating the connection to remote sql query engine
[...]
| kyuubi.session.engine.open.max.attempts | 9
| The number of times an open engine will retry when encountering a special
error.
[...]
+| kyuubi.session.engine.open.onFailure | RETRY
| The behavior when opening engine failed: <ul> <li>RETRY: retry to open
engine for kyuubi.session.engine.open.max.attempts times.</li>
<li>DEREGISTER_IMMEDIATELY: deregister the engine immediately.</li>
<li>DEREGISTER_AFTER_RETRY: deregister the engine after retry to open engine
for kyuubi.session.engine.open.max.attempts times.</li></ul>
[...]
| kyuubi.session.engine.open.retry.wait | PT10S
| How long to wait before retrying to open the engine after failure.
[...]
| kyuubi.session.engine.share.level | USER
| (deprecated) - Using kyuubi.engine.share.level instead
[...]
| kyuubi.session.engine.spark.initialize.sql
|| The initialize sql for Spark session. It fallback to
`kyuubi.engine.session.initialize.sql`
[...]
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index 9411323d2..56859b11a 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -1516,6 +1516,22 @@ object KyuubiConf {
.timeConf
.createWithDefault(Duration.ofSeconds(10).toMillis)
+ object EngineOpenOnFailure extends Enumeration {
+ type EngineOpenOnFailure = Value
+ val RETRY, DEREGISTER_IMMEDIATELY, DEREGISTER_AFTER_RETRY = Value
+ }
+
+ val ENGINE_OPEN_ON_FAILURE: ConfigEntry[String] =
+ buildConf("kyuubi.session.engine.open.onFailure")
+ .doc("The behavior when opening engine failed: <ul>" +
+ s" <li>RETRY: retry to open engine for ${ENGINE_OPEN_MAX_ATTEMPTS.key}
times.</li>" +
+ " <li>DEREGISTER_IMMEDIATELY: deregister the engine immediately.</li>"
+
+ " <li>DEREGISTER_AFTER_RETRY: deregister the engine after retry to
open engine for " +
+ s"${ENGINE_OPEN_MAX_ATTEMPTS.key} times.</li></ul>")
+ .version("1.8.1")
+ .stringConf
+ .createWithDefault(EngineOpenOnFailure.RETRY.toString)
+
val ENGINE_INIT_TIMEOUT: ConfigEntry[Long] =
buildConf("kyuubi.session.engine.initialize.timeout")
.doc("Timeout for starting the background engine, e.g. SparkSQLEngine.")
.version("1.0.0")
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
index 6122a6f13..2bd855403 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
@@ -307,6 +307,19 @@ private[kyuubi] class EngineRef(
}
}
+ /**
+ * Deregister the engine from engine space with the given host and port on
connection failure.
+ *
+ * @param discoveryClient the zookeeper client to get or create engine
instance
+ * @param hostPort the existing engine host and port
+ */
+ def deregister(discoveryClient: DiscoveryClient, hostPort: (String, Int)):
Unit =
+ tryWithLock(discoveryClient) {
+ if (discoveryClient.getServerHost(engineSpace) == Option(hostPort)) {
+ discoveryClient.delete(engineSpace)
+ }
+ }
+
def close(): Unit = {
if (shareLevel == CONNECTION && builder != null) {
try {
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
index a5abb61fe..931a4c70e 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
@@ -27,6 +27,7 @@ import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.client.KyuubiSyncThriftClient
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
+import org.apache.kyuubi.config.KyuubiConf.EngineOpenOnFailure._
import
org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_ENGINE_CREDENTIALS_KEY,
KYUUBI_SESSION_HANDLE_KEY, KYUUBI_SESSION_SIGN_PUBLICKEY,
KYUUBI_SESSION_USER_SIGN}
import org.apache.kyuubi.engine.{EngineRef, KyuubiApplicationManager}
import org.apache.kyuubi.events.{EventBus, KyuubiSessionEvent}
@@ -141,10 +142,21 @@ class KyuubiSessionImpl(
val maxAttempts = sessionManager.getConf.get(ENGINE_OPEN_MAX_ATTEMPTS)
val retryWait = sessionManager.getConf.get(ENGINE_OPEN_RETRY_WAIT)
+ val openOnFailure =
+
EngineOpenOnFailure.withName(sessionManager.getConf.get(ENGINE_OPEN_ON_FAILURE))
var attempt = 0
var shouldRetry = true
while (attempt <= maxAttempts && shouldRetry) {
val (host, port) = engine.getOrCreate(discoveryClient,
extraEngineLog)
+
+ def deregisterEngine(): Unit =
+ try {
+ engine.deregister(discoveryClient, (host, port))
+ } catch {
+ case e: Throwable =>
+ warn(s"Error on de-registering engine [${engine.engineSpace}
$host:$port]", e)
+ }
+
try {
val passwd =
if (sessionManager.getConf.get(ENGINE_SECURITY_ENABLED)) {
@@ -167,6 +179,10 @@ class KyuubiSessionImpl(
s" $attempt/$maxAttempts times, retrying",
e.getCause)
Thread.sleep(retryWait)
+ openOnFailure match {
+ case DEREGISTER_IMMEDIATELY => deregisterEngine()
+ case _ =>
+ }
shouldRetry = true
case e: Throwable =>
error(
@@ -174,6 +190,10 @@ class KyuubiSessionImpl(
s" for $user session failed",
e)
openSessionError = Some(e)
+ openOnFailure match {
+ case DEREGISTER_IMMEDIATELY | DEREGISTER_AFTER_RETRY =>
deregisterEngine()
+ case _ =>
+ }
throw e
} finally {
attempt += 1