This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 3667026eec [KYUUBI #7281] Support to enable asynchronous post event
3667026eec is described below
commit 3667026eece44b44334ce5f50d18b1ce95e2b19a
Author: Wang, Fei <[email protected]>
AuthorDate: Sun Dec 21 22:19:26 2025 -0800
[KYUUBI #7281] Support to enable asynchronous post event
### Why are the changes needed?
The registerAsync is not called in the code, just in UT.
<img width="1234" height="563" alt="image"
src="https://github.com/user-attachments/assets/de41964b-bc8c-4867-9f6a-99e8984add91"
/>
This PR add an option to enable async event post for kyuubi server.
### How was this patch tested?
Just add an option, the function has been covered by
https://github.com/apache/kyuubi/pull/1853.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #7281 from turboFei/async_event.
Closes #7281
1e5da4c28 [Wang, Fei] Support to enable post event asynchronous
Authored-by: Wang, Fei <[email protected]>
Signed-off-by: Wang, Fei <[email protected]>
---
docs/configuration/settings.md | 29 +++++++++++-----------
.../org/apache/kyuubi/config/KyuubiConf.scala | 8 ++++++
.../kyuubi/events/EventHandlerRegister.scala | 8 +++++-
.../kyuubi/events/ServerEventHandlerRegister.scala | 4 +++
4 files changed, 34 insertions(+), 15 deletions(-)
diff --git a/docs/configuration/settings.md b/docs/configuration/settings.md
index bd45438e37..5e942d5b86 100644
--- a/docs/configuration/settings.md
+++ b/docs/configuration/settings.md
@@ -60,20 +60,21 @@ You can configure the Kyuubi properties in
`$KYUUBI_HOME/conf/kyuubi-defaults.co
### Backend
-| Key | Default
|
[...]
-|--------------------------------------------------|---------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
[...]
-| kyuubi.backend.engine.exec.pool.keepalive.time | PT1M
| Time(ms) that an idle async thread of the operation execution thread pool
will wait for a new task to arrive before terminating in SQL engine
applications
[...]
-| kyuubi.backend.engine.exec.pool.shutdown.timeout | PT10S
| Timeout(ms) for the operation execution thread pool to terminate in SQL
engine applications
[...]
-| kyuubi.backend.engine.exec.pool.size | 100
| Number of threads in the operation execution thread pool of SQL engine
applications
[...]
-| kyuubi.backend.engine.exec.pool.wait.queue.size | 100
| Size of the wait queue for the operation execution thread pool in SQL engine
applications
[...]
-| kyuubi.backend.server.event.json.log.path | file:///tmp/kyuubi/events
| The location of server events go for the built-in JSON logger
[...]
-| kyuubi.backend.server.event.kafka.close.timeout | PT5S
| Period to wait for Kafka producer of server event handlers to close.
[...]
-| kyuubi.backend.server.event.kafka.topic | <undefined>
| The topic of server events go for the built-in Kafka logger
[...]
-| kyuubi.backend.server.event.loggers
|| A comma-separated list of server history loggers, where session/operation
etc events go.<ul> <li>JSON: the events will be written to the location of
kyuubi.backend.server.event.json.log.path</li> <li>KAFKA: the events will be
serialized in JSON format and sent to topic of
`kyuubi.backend.server.event.kafka.topic`. Note: For the configs of Kafka
producer, please specify them with the prefix: `kyuubi.backend.s [...]
-| kyuubi.backend.server.exec.pool.keepalive.time | PT1M
| Time(ms) that an idle async thread of the operation execution thread pool
will wait for a new task to arrive before terminating in Kyuubi server
[...]
-| kyuubi.backend.server.exec.pool.shutdown.timeout | PT10S
| Timeout(ms) for the operation execution thread pool to terminate in Kyuubi
server
[...]
-| kyuubi.backend.server.exec.pool.size | 100
| Number of threads in the operation execution thread pool of Kyuubi server
[...]
-| kyuubi.backend.server.exec.pool.wait.queue.size | 100
| Size of the wait queue for the operation execution thread pool of Kyuubi
server
[...]
+| Key | Default
|
[...]
+|--------------------------------------------------|---------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
[...]
+| kyuubi.backend.engine.exec.pool.keepalive.time | PT1M
| Time(ms) that an idle async thread of the operation execution thread pool
will wait for a new task to arrive before terminating in SQL engine
applications
[...]
+| kyuubi.backend.engine.exec.pool.shutdown.timeout | PT10S
| Timeout(ms) for the operation execution thread pool to terminate in SQL
engine applications
[...]
+| kyuubi.backend.engine.exec.pool.size | 100
| Number of threads in the operation execution thread pool of SQL engine
applications
[...]
+| kyuubi.backend.engine.exec.pool.wait.queue.size | 100
| Size of the wait queue for the operation execution thread pool in SQL engine
applications
[...]
+| kyuubi.backend.server.event.async.enabled | false
| Whether backend server event logging is asynchronous.
[...]
+| kyuubi.backend.server.event.json.log.path | file:///tmp/kyuubi/events
| The location of server events go for the built-in JSON logger
[...]
+| kyuubi.backend.server.event.kafka.close.timeout | PT5S
| Period to wait for Kafka producer of server event handlers to close.
[...]
+| kyuubi.backend.server.event.kafka.topic | <undefined>
| The topic of server events go for the built-in Kafka logger
[...]
+| kyuubi.backend.server.event.loggers
|| A comma-separated list of server history loggers, where session/operation
etc events go.<ul> <li>JSON: the events will be written to the location of
kyuubi.backend.server.event.json.log.path</li> <li>KAFKA: the events will be
serialized in JSON format and sent to topic of
`kyuubi.backend.server.event.kafka.topic`. Note: For the configs of Kafka
producer, please specify them with the prefix: `kyuubi.backend.s [...]
+| kyuubi.backend.server.exec.pool.keepalive.time | PT1M
| Time(ms) that an idle async thread of the operation execution thread pool
will wait for a new task to arrive before terminating in Kyuubi server
[...]
+| kyuubi.backend.server.exec.pool.shutdown.timeout | PT10S
| Timeout(ms) for the operation execution thread pool to terminate in Kyuubi
server
[...]
+| kyuubi.backend.server.exec.pool.size | 100
| Number of threads in the operation execution thread pool of Kyuubi server
[...]
+| kyuubi.backend.server.exec.pool.wait.queue.size | 100
| Size of the wait queue for the operation execution thread pool of Kyuubi
server
[...]
### Batch
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index c1fad17ec0..1a10a1101b 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -2685,6 +2685,14 @@ object KyuubiConf {
"Unsupported event loggers")
.createWithDefault(Nil)
+ val SERVER_EVENT_ASYNC_ENABLED: ConfigEntry[Boolean] =
+ buildConf("kyuubi.backend.server.event.async.enabled")
+ .doc("Whether backend server event logging is asynchronous.")
+ .version("1.11.0")
+ .serverOnly
+ .booleanConf
+ .createWithDefault(false)
+
@deprecated("using kyuubi.engine.spark.event.loggers instead", "1.6.0")
val ENGINE_EVENT_LOGGERS: ConfigEntry[Seq[String]] =
buildConf("kyuubi.engine.event.loggers")
diff --git
a/kyuubi-events/src/main/scala/org/apache/kyuubi/events/EventHandlerRegister.scala
b/kyuubi-events/src/main/scala/org/apache/kyuubi/events/EventHandlerRegister.scala
index f75e4be4f5..4f2973b697 100644
---
a/kyuubi-events/src/main/scala/org/apache/kyuubi/events/EventHandlerRegister.scala
+++
b/kyuubi-events/src/main/scala/org/apache/kyuubi/events/EventHandlerRegister.scala
@@ -25,6 +25,8 @@ trait EventHandlerRegister extends Logging {
protected def getLoggers(conf: KyuubiConf): Seq[String]
+ protected def asyncEventLogging(conf: KyuubiConf): Boolean = false
+
def registerEventLoggers(conf: KyuubiConf): Unit = {
val loggers = getLoggers(conf)
register(loggers, conf)
@@ -35,7 +37,11 @@ trait EventHandlerRegister extends Logging {
.map(EventLoggerType.withName)
.foreach { logger =>
val handlers = loadEventHandler(logger, conf)
- handlers.foreach(EventBus.register)
+ if (asyncEventLogging(conf)) {
+ handlers.foreach(EventBus.registerAsync)
+ } else {
+ handlers.foreach(EventBus.register)
+ }
}
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/events/ServerEventHandlerRegister.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/events/ServerEventHandlerRegister.scala
index ca6c776ac8..d44916b26b 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/events/ServerEventHandlerRegister.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/events/ServerEventHandlerRegister.scala
@@ -56,4 +56,8 @@ object ServerEventHandlerRegister extends
EventHandlerRegister {
override protected def getLoggers(conf: KyuubiConf): Seq[String] = {
conf.get(SERVER_EVENT_LOGGERS)
}
+
+ override protected def asyncEventLogging(conf: KyuubiConf): Boolean = {
+ conf.get(SERVER_EVENT_ASYNC_ENABLED)
+ }
}