This is an automated email from the ASF dual-hosted git repository.
bowenliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 430f6d590 [KYUUBI #4777] Deregister event handlers when stopping
server with event handler made auto-closeable
430f6d590 is described below
commit 430f6d5901cfb79267db93dffaa3dc3bb598592f
Author: liangbowen <[email protected]>
AuthorDate: Fri Apr 28 09:59:48 2023 +0800
[KYUUBI #4777] Deregister event handlers when stopping server with event
handler made auto-closeable
### _Why are the changes needed?_
- deregister event handlers when stopping Kyuubiserver, by deregister them
from EventBus's handler Registry
- change `EventHandler` from `type` to `trait` and make it extending
`AutoCloseable`
- implement `close` method in `JsonLoggingEventHandler` for closing writers
and streams
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run
test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #4777 from bowenliang123/closable-eventlogger.
Closes #4777
db1ad5d73 [liangbowen] make EventBus.deregisterAll method synchronized
648471ba1 [liangbowen] update
d28931d3c [liangbowen] re-register event loggers in ut
7121fa33a [liangbowen] make EventHandler closable, and de-register all
event handlers when stopping server
Authored-by: liangbowen <[email protected]>
Signed-off-by: liangbowen <[email protected]>
---
.../main/scala/org/apache/kyuubi/events/EventBus.scala | 16 ++++++++++++++++
.../kyuubi/events/handler/JsonLoggingEventHandler.scala | 16 ++++++++++++++--
.../scala/org/apache/kyuubi/events/handler/package.scala | 6 +++++-
.../scala/org/apache/kyuubi/server/KyuubiServer.scala | 4 +++-
.../handler/ServerJsonLoggingEventHandlerSuite.scala | 2 ++
5 files changed, 40 insertions(+), 4 deletions(-)
diff --git
a/kyuubi-events/src/main/scala/org/apache/kyuubi/events/EventBus.scala
b/kyuubi-events/src/main/scala/org/apache/kyuubi/events/EventBus.scala
index e854e40a7..063f1719e 100644
--- a/kyuubi-events/src/main/scala/org/apache/kyuubi/events/EventBus.scala
+++ b/kyuubi-events/src/main/scala/org/apache/kyuubi/events/EventBus.scala
@@ -40,6 +40,8 @@ sealed trait EventBus {
def register[T <: KyuubiEvent: ClassTag](eventHandler: EventHandler[T]):
EventBus
def registerAsync[T <: KyuubiEvent: ClassTag](eventHandler:
EventHandler[T]): EventBus
+
+ def deregisterAll(): Unit = {}
}
object EventBus extends Logging {
@@ -68,6 +70,10 @@ object EventBus extends Logging {
def registerAsync[T <: KyuubiEvent: ClassTag](et: EventHandler[T]): EventBus
=
defaultEventBus.registerAsync[T](et)
+ def deregisterAll(): Unit = synchronized {
+ defaultEventBus.deregisterAll()
+ }
+
private case class EventBusLive() extends EventBus {
private[this] lazy val eventHandlerRegistry = new Registry
private[this] lazy val asyncEventHandlerRegistry = new Registry
@@ -96,6 +102,11 @@ object EventBus extends Logging {
asyncEventHandlerRegistry.register(et)
this
}
+
+ override def deregisterAll(): Unit = {
+ eventHandlerRegistry.deregisterAll()
+ asyncEventHandlerRegistry.deregisterAll()
+ }
}
private class Registry {
@@ -122,5 +133,10 @@ object EventBus extends Logging {
} yield parent
clazz :: parents
}
+
+ def deregisterAll(): Unit = {
+ eventHandlers.values.flatten.foreach(_.close())
+ eventHandlers.clear()
+ }
}
}
diff --git
a/kyuubi-events/src/main/scala/org/apache/kyuubi/events/handler/JsonLoggingEventHandler.scala
b/kyuubi-events/src/main/scala/org/apache/kyuubi/events/handler/JsonLoggingEventHandler.scala
index f6f74de9a..77d80b152 100644
---
a/kyuubi-events/src/main/scala/org/apache/kyuubi/events/handler/JsonLoggingEventHandler.scala
+++
b/kyuubi-events/src/main/scala/org/apache/kyuubi/events/handler/JsonLoggingEventHandler.scala
@@ -65,6 +65,17 @@ class JsonLoggingEventHandler(
stream.foreach(_.hflush())
}
+ override def close(): Unit = {
+ writers.values.foreach { case (writer, stream) =>
+ writer.flush()
+ stream.foreach(_.hflush())
+ writer.close()
+ stream.foreach(_.close())
+ }
+ writers.clear()
+ fs = null
+ }
+
private def getOrUpdate(event: KyuubiEvent): Logger = synchronized {
val partitions = event.partitions.map(kv =>
s"${kv._1}=${kv._2}").mkString(Path.SEPARATOR)
writers.getOrElseUpdate(
@@ -108,6 +119,7 @@ class JsonLoggingEventHandler(
}
object JsonLoggingEventHandler {
- val JSON_LOG_DIR_PERM: FsPermission = new
FsPermission(Integer.parseInt("770", 8).toShort)
- val JSON_LOG_FILE_PERM: FsPermission = new
FsPermission(Integer.parseInt("660", 8).toShort)
+ private val JSON_LOG_DIR_PERM: FsPermission = new
FsPermission(Integer.parseInt("770", 8).toShort)
+ private val JSON_LOG_FILE_PERM: FsPermission =
+ new FsPermission(Integer.parseInt("660", 8).toShort)
}
diff --git
a/kyuubi-events/src/main/scala/org/apache/kyuubi/events/handler/package.scala
b/kyuubi-events/src/main/scala/org/apache/kyuubi/events/handler/package.scala
index 41cf001ed..69e1fdcee 100644
---
a/kyuubi-events/src/main/scala/org/apache/kyuubi/events/handler/package.scala
+++
b/kyuubi-events/src/main/scala/org/apache/kyuubi/events/handler/package.scala
@@ -18,5 +18,9 @@
package org.apache.kyuubi.events
package object handler {
- type EventHandler[T <: KyuubiEvent] = T => Unit
+ trait EventHandler[T <: KyuubiEvent] extends AutoCloseable {
+ def apply(event: T): Unit
+
+ def close(): Unit = {}
+ }
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala
index a7f2e8178..8bcd8d084 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala
@@ -193,5 +193,7 @@ class KyuubiServer(name: String) extends Serverable(name) {
ServerEventHandlerRegister.registerEventLoggers(conf)
}
- override protected def stopServer(): Unit = {}
+ override protected def stopServer(): Unit = {
+ EventBus.deregisterAll()
+ }
}
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/events/handler/ServerJsonLoggingEventHandlerSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/events/handler/ServerJsonLoggingEventHandlerSuite.scala
index 3bdc9cd38..7c79d6a87 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/events/handler/ServerJsonLoggingEventHandlerSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/events/handler/ServerJsonLoggingEventHandlerSuite.scala
@@ -33,6 +33,7 @@ import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
import org.apache.kyuubi._
import org.apache.kyuubi.client.util.BatchUtils._
import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.events.ServerEventHandlerRegister
import org.apache.kyuubi.operation.HiveJDBCTestHelper
import org.apache.kyuubi.operation.OperationState._
import org.apache.kyuubi.server.KyuubiServer
@@ -197,6 +198,7 @@ class ServerJsonLoggingEventHandlerSuite extends
WithKyuubiServer with HiveJDBCT
server.initialize(conf)
server.start()
server.stop()
+ ServerEventHandlerRegister.registerEventLoggers(conf) // register event
loggers again
val hostName = InetAddress.getLocalHost.getCanonicalHostName
val kyuubiServerInfoPath =