This is an automated email from the ASF dual-hosted git repository.
ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new c24e984a3 [KYUUBI #5423] Support chaining SessionConfAdvisors
c24e984a3 is described below
commit c24e984a3e8a41ad20bc4a5517e51a856e3e5452
Author: senmiaoliu <[email protected]>
AuthorDate: Mon Oct 30 09:48:25 2023 +0800
[KYUUBI #5423] Support chaining SessionConfAdvisors
### _Why are the changes needed?_
close #5423
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run
test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests)
locally before make a pull request
### _Was this patch authored or co-authored using generative AI tooling?_
NO
Closes #5485 from lsm1/branch-5423.
Closes #5423
0bd3b0233 [senmiaoliu] mention in the migration guide
dddafeb9b [senmiaoliu] support multi session conf advisor
Authored-by: senmiaoliu <[email protected]>
Signed-off-by: ulyssesyou <[email protected]>
---
docs/configuration/settings.md | 2 +-
docs/deployment/migration-guide.md | 4 ++
.../org/apache/kyuubi/config/KyuubiConf.scala | 5 ++-
.../org/apache/kyuubi/plugin/PluginLoader.scala | 24 ++++++------
.../apache/kyuubi/session/KyuubiBatchSession.scala | 6 +--
.../apache/kyuubi/session/KyuubiSessionImpl.scala | 6 +--
.../kyuubi/session/KyuubiSessionManager.scala | 2 +-
.../apache/kyuubi/plugin/PluginLoaderSuite.scala | 43 ++++++++++++++++++----
8 files changed, 63 insertions(+), 29 deletions(-)
diff --git a/docs/configuration/settings.md b/docs/configuration/settings.md
index 1bf1230e1..5e00d0b75 100644
--- a/docs/configuration/settings.md
+++ b/docs/configuration/settings.md
@@ -410,7 +410,7 @@ You can configure the Kyuubi properties in
`$KYUUBI_HOME/conf/kyuubi-defaults.co
|------------------------------------------------------|-------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
[...]
| kyuubi.session.check.interval | PT5M
| The check interval for session timeout.
[...]
| kyuubi.session.close.on.disconnect | true
| Session will be closed when client disconnects from kyuubi gateway. Set
this to false to have session outlive its parent connection.
[...]
-| kyuubi.session.conf.advisor | <undefined>
| A config advisor plugin for Kyuubi Server. This plugin can provide some
custom configs for different users or session configs and overwrite the session
configs before opening a new session. This config value should be a subclass of
`org.apache.kyuubi.plugin.SessionConfAdvisor` which has a zero-arg constructor.
[...]
+| kyuubi.session.conf.advisor | <undefined>
| A config advisor plugin for Kyuubi Server. This plugin can provide a list
of custom configs for different users or session configs and overwrite the
session configs before opening a new session. This config value should be a
subclass of `org.apache.kyuubi.plugin.SessionConfAdvisor` which has a zero-arg
constructor.
[...]
| kyuubi.session.conf.file.reload.interval | PT10M
| When `FileSessionConfAdvisor` is used, this configuration defines the
expired time of `$KYUUBI_CONF_DIR/kyuubi-session-<profile>.conf` in the cache.
After exceeding this value, the file will be reloaded.
[...]
| kyuubi.session.conf.ignore.list
|| A comma-separated list of ignored keys. If the client connection contains
any of them, the key and the corresponding value will be removed silently
during engine bootstrap and connection setup. Note that this rule is for
server-side protection defined via administrators to prevent some essential
configs from tampering but will not forbid users to set dynamic configurations
via SET syntax. [...]
| kyuubi.session.conf.profile | <undefined>
| Specify a profile to load session-level configurations from
`$KYUUBI_CONF_DIR/kyuubi-session-<profile>.conf`. This configuration will be
ignored if the file does not exist. This configuration only takes effect when
`kyuubi.session.conf.advisor` is set as
`org.apache.kyuubi.session.FileSessionConfAdvisor`.
[...]
diff --git a/docs/deployment/migration-guide.md
b/docs/deployment/migration-guide.md
index 0430e8cff..bf5b184cd 100644
--- a/docs/deployment/migration-guide.md
+++ b/docs/deployment/migration-guide.md
@@ -17,6 +17,10 @@
# Kyuubi Migration Guide
+## Upgrading from Kyuubi 1.8 to 1.9
+
+* Since Kyuubi 1.9.0, `kyuubi.session.conf.advisor` can be set as a sequence,
Kyuubi supported chaining SessionConfAdvisors.
+
## Upgrading from Kyuubi 1.7 to 1.8
* Since Kyuubi 1.8, SQLite is added and becomes the default database type of
Kyuubi metastore, as Derby has been deprecated.
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index a5c0aee0a..981db1720 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -2468,14 +2468,15 @@ object KyuubiConf {
.checkValues(OperationLanguages)
.createWithDefault(OperationLanguages.SQL.toString)
- val SESSION_CONF_ADVISOR: OptionalConfigEntry[String] =
+ val SESSION_CONF_ADVISOR: OptionalConfigEntry[Seq[String]] =
buildConf("kyuubi.session.conf.advisor")
- .doc("A config advisor plugin for Kyuubi Server. This plugin can provide
some custom " +
+ .doc("A config advisor plugin for Kyuubi Server. This plugin can provide
a list of custom " +
"configs for different users or session configs and overwrite the
session configs before " +
"opening a new session. This config value should be a subclass of " +
"`org.apache.kyuubi.plugin.SessionConfAdvisor` which has a zero-arg
constructor.")
.version("1.5.0")
.stringConf
+ .toSequence()
.createOptional
val GROUP_PROVIDER: ConfigEntry[String] =
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/plugin/PluginLoader.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/plugin/PluginLoader.scala
index da4c8e4a9..1bc80dc7d 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/plugin/PluginLoader.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/plugin/PluginLoader.scala
@@ -25,20 +25,22 @@ import org.apache.kyuubi.util.reflect.DynConstructors
private[kyuubi] object PluginLoader {
- def loadSessionConfAdvisor(conf: KyuubiConf): SessionConfAdvisor = {
+ def loadSessionConfAdvisor(conf: KyuubiConf): Seq[SessionConfAdvisor] = {
val advisorClass = conf.get(KyuubiConf.SESSION_CONF_ADVISOR)
if (advisorClass.isEmpty) {
- return new DefaultSessionConfAdvisor()
+ return new DefaultSessionConfAdvisor() :: Nil
}
-
- try {
-
DynConstructors.builder.impl(advisorClass.get).buildChecked[SessionConfAdvisor].newInstance()
- } catch {
- case _: ClassCastException =>
- throw new KyuubiException(
- s"Class ${advisorClass.get} is not a child of
'${classOf[SessionConfAdvisor].getName}'.")
- case NonFatal(e) =>
- throw new IllegalArgumentException(s"Error while instantiating
'${advisorClass.get}': ", e)
+ advisorClass.get.map { advisorClassName =>
+ try {
+ DynConstructors.builder.impl(advisorClassName)
+ .buildChecked[SessionConfAdvisor].newInstance()
+ } catch {
+ case _: ClassCastException =>
+ throw new KyuubiException(
+ s"Class $advisorClassName is not a child of
'${classOf[SessionConfAdvisor].getName}'.")
+ case NonFatal(e) =>
+ throw new IllegalArgumentException(s"Error while instantiating
'$advisorClassName': ", e)
+ }
}
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala
index 8489e6d30..ccba65594 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala
@@ -81,12 +81,12 @@ class KyuubiBatchSession(
sessionConf.getBatchConf(batchType) ++
sessionManager.validateBatchConf(conf)
val optimizedConf: Map[String, String] = {
- val confOverlay = sessionManager.sessionConfAdvisor.getConfOverlay(
+ val confOverlay = sessionManager.sessionConfAdvisor.map(_.getConfOverlay(
user,
- normalizedConf.asJava)
+ normalizedConf.asJava).asScala).reduce(_ ++ _)
if (confOverlay != null) {
val overlayConf = new KyuubiConf(false)
- confOverlay.asScala.foreach { case (k, v) => overlayConf.set(k, v) }
+ confOverlay.foreach { case (k, v) => overlayConf.set(k, v) }
normalizedConf ++ overlayConf.getBatchConf(batchType)
} else {
warn(s"the server plugin return null value for user: $user, ignore it")
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
index 6dd1810a8..ca411d71f 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
@@ -53,11 +53,11 @@ class KyuubiSessionImpl(
override val sessionType: SessionType = SessionType.INTERACTIVE
private[kyuubi] val optimizedConf: Map[String, String] = {
- val confOverlay = sessionManager.sessionConfAdvisor.getConfOverlay(
+ val confOverlay = sessionManager.sessionConfAdvisor.map(_.getConfOverlay(
user,
- normalizedConf.asJava)
+ normalizedConf.asJava).asScala).reduce(_ ++ _)
if (confOverlay != null) {
- normalizedConf ++ confOverlay.asScala
+ normalizedConf ++ confOverlay
} else {
warn(s"the server plugin return null value for user: $user, ignore it")
normalizedConf
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
index 02a3ee32c..4c625ce67 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
@@ -58,7 +58,7 @@ class KyuubiSessionManager private (name: String) extends
SessionManager(name) {
if (conf.isRESTEnabled) Some(new MetadataManager()) else None
// lazy is required for plugins since the conf is null when this class
initialization
- lazy val sessionConfAdvisor: SessionConfAdvisor =
PluginLoader.loadSessionConfAdvisor(conf)
+ lazy val sessionConfAdvisor: Seq[SessionConfAdvisor] =
PluginLoader.loadSessionConfAdvisor(conf)
lazy val groupProvider: GroupProvider = PluginLoader.loadGroupProvider(conf)
private var limiter: Option[SessionLimiter] = None
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/plugin/PluginLoaderSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/plugin/PluginLoaderSuite.scala
index e24b79c2c..bd7f78e24 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/plugin/PluginLoaderSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/plugin/PluginLoaderSuite.scala
@@ -27,15 +27,15 @@ class PluginLoaderSuite extends KyuubiFunSuite {
test("SessionConfAdvisor - wrong class") {
val conf = new KyuubiConf(false)
-
assert(PluginLoader.loadSessionConfAdvisor(conf).isInstanceOf[DefaultSessionConfAdvisor])
+
assert(PluginLoader.loadSessionConfAdvisor(conf).head.isInstanceOf[DefaultSessionConfAdvisor])
- conf.set(KyuubiConf.SESSION_CONF_ADVISOR,
classOf[InvalidSessionConfAdvisor].getName)
+ conf.set(KyuubiConf.SESSION_CONF_ADVISOR,
Seq(classOf[InvalidSessionConfAdvisor].getName))
val msg1 = intercept[KyuubiException] {
PluginLoader.loadSessionConfAdvisor(conf)
}.getMessage
assert(msg1.contains(s"is not a child of
'${classOf[SessionConfAdvisor].getName}'"))
- conf.set(KyuubiConf.SESSION_CONF_ADVISOR, "non.exists")
+ conf.set(KyuubiConf.SESSION_CONF_ADVISOR, Seq("non.exists"))
val msg2 = intercept[IllegalArgumentException] {
PluginLoader.loadSessionConfAdvisor(conf)
}.getMessage
@@ -44,27 +44,46 @@ class PluginLoaderSuite extends KyuubiFunSuite {
test("FileSessionConfAdvisor") {
val conf = new KyuubiConf(false)
- conf.set(KyuubiConf.SESSION_CONF_ADVISOR,
classOf[FileSessionConfAdvisor].getName)
+ conf.set(KyuubiConf.SESSION_CONF_ADVISOR,
Seq(classOf[FileSessionConfAdvisor].getName))
val advisor = PluginLoader.loadSessionConfAdvisor(conf)
- val emptyConfig = advisor.getConfOverlay("chris", conf.getAll.asJava)
+ val emptyConfig =
+ advisor.map(_.getConfOverlay("chris",
conf.getAll.asJava).asScala).reduce(_ ++ _).asJava
assert(emptyConfig.isEmpty)
conf.set(KyuubiConf.SESSION_CONF_PROFILE, "non.exists")
- val nonExistsConfig = advisor.getConfOverlay("chris", conf.getAll.asJava)
+ val nonExistsConfig =
+ advisor.map(_.getConfOverlay("chris",
conf.getAll.asJava).asScala).reduce(_ ++ _).asJava
assert(nonExistsConfig.isEmpty)
conf.set(KyuubiConf.SESSION_CONF_PROFILE, "cluster-a")
- val clusterAConf = advisor.getConfOverlay("chris", conf.getAll.asJava)
+ val clusterAConf =
+ advisor.map(_.getConfOverlay("chris",
conf.getAll.asJava).asScala).reduce(_ ++ _).asJava
assert(clusterAConf.get("kyuubi.ha.namespace") == "kyuubi-ns-a")
assert(clusterAConf.get("kyuubi.zk.ha.namespace") == null)
assert(clusterAConf.size() == 5)
- val clusterAConfFromCache = advisor.getConfOverlay("chris",
conf.getAll.asJava)
+ val clusterAConfFromCache =
+ advisor.map(_.getConfOverlay("chris",
conf.getAll.asJava).asScala).reduce(_ ++ _).asJava
assert(clusterAConfFromCache.get("kyuubi.ha.namespace") == "kyuubi-ns-a")
assert(clusterAConfFromCache.get("kyuubi.zk.ha.namespace") == null)
assert(clusterAConfFromCache.size() == 5)
}
+ test("SessionConfAdvisor - multi class") {
+ val conf = new KyuubiConf(false)
+ conf.set(
+ KyuubiConf.SESSION_CONF_ADVISOR,
+ Seq(classOf[FileSessionConfAdvisor].getName,
classOf[TestSessionConfAdvisor].getName))
+ val advisor = PluginLoader.loadSessionConfAdvisor(conf)
+ conf.set(KyuubiConf.SESSION_CONF_PROFILE, "cluster-a")
+ val clusterAConf =
+ advisor.map(_.getConfOverlay("chris",
conf.getAll.asJava).asScala).reduce(_ ++ _).asJava
+ assert(clusterAConf.get("kyuubi.ha.namespace") == "kyuubi-ns-a")
+ assert(clusterAConf.get("kyuubi.zk.ha.namespace") == null)
+ assert(clusterAConf.get("spark.k3") == "v3")
+ assert(clusterAConf.size() == 7)
+ }
+
test("GroupProvider - wrong class") {
val conf = new KyuubiConf(false)
conf.set(KyuubiConf.GROUP_PROVIDER, "hadoop")
@@ -99,3 +118,11 @@ class PluginLoaderSuite extends KyuubiFunSuite {
class InvalidSessionConfAdvisor
class InvalidGroupProvider
+
+class TestSessionConfAdvisor extends SessionConfAdvisor {
+ override def getConfOverlay(
+ user: String,
+ sessionConf: java.util.Map[String, String]): java.util.Map[String,
String] = {
+ Map("spark.k3" -> "v3", "spark.k4" -> "v4").asJava
+ }
+}