This is an automated email from the ASF dual-hosted git repository.

bowenliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 430f6d590 [KYUUBI #4777] Deregister event handlers when stopping 
server with event handler made auto-closeable
430f6d590 is described below

commit 430f6d5901cfb79267db93dffaa3dc3bb598592f
Author: liangbowen <[email protected]>
AuthorDate: Fri Apr 28 09:59:48 2023 +0800

    [KYUUBI #4777] Deregister event handlers when stopping server with event 
handler made auto-closeable
    
    ### _Why are the changes needed?_
    
    - deregister event handlers when stopping Kyuubiserver, by deregister them 
from EventBus's handler Registry
    - change `EventHandler` from `type` to `trait` and make it extending  
`AutoCloseable`
    - implement `close` method in `JsonLoggingEventHandler` for closing writers 
and streams
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run 
test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests)
 locally before make a pull request
    
    Closes #4777 from bowenliang123/closable-eventlogger.
    
    Closes #4777
    
    db1ad5d73 [liangbowen] make EventBus.deregisterAll method synchronized
    648471ba1 [liangbowen] update
    d28931d3c [liangbowen] re-register event loggers in ut
    7121fa33a [liangbowen] make EventHandler closable, and de-register all 
event handlers when stopping server
    
    Authored-by: liangbowen <[email protected]>
    Signed-off-by: liangbowen <[email protected]>
---
 .../main/scala/org/apache/kyuubi/events/EventBus.scala   | 16 ++++++++++++++++
 .../kyuubi/events/handler/JsonLoggingEventHandler.scala  | 16 ++++++++++++++--
 .../scala/org/apache/kyuubi/events/handler/package.scala |  6 +++++-
 .../scala/org/apache/kyuubi/server/KyuubiServer.scala    |  4 +++-
 .../handler/ServerJsonLoggingEventHandlerSuite.scala     |  2 ++
 5 files changed, 40 insertions(+), 4 deletions(-)

diff --git 
a/kyuubi-events/src/main/scala/org/apache/kyuubi/events/EventBus.scala 
b/kyuubi-events/src/main/scala/org/apache/kyuubi/events/EventBus.scala
index e854e40a7..063f1719e 100644
--- a/kyuubi-events/src/main/scala/org/apache/kyuubi/events/EventBus.scala
+++ b/kyuubi-events/src/main/scala/org/apache/kyuubi/events/EventBus.scala
@@ -40,6 +40,8 @@ sealed trait EventBus {
   def register[T <: KyuubiEvent: ClassTag](eventHandler: EventHandler[T]): 
EventBus
 
   def registerAsync[T <: KyuubiEvent: ClassTag](eventHandler: 
EventHandler[T]): EventBus
+
+  def deregisterAll(): Unit = {}
 }
 
 object EventBus extends Logging {
@@ -68,6 +70,10 @@ object EventBus extends Logging {
   def registerAsync[T <: KyuubiEvent: ClassTag](et: EventHandler[T]): EventBus 
=
     defaultEventBus.registerAsync[T](et)
 
+  def deregisterAll(): Unit = synchronized {
+    defaultEventBus.deregisterAll()
+  }
+
   private case class EventBusLive() extends EventBus {
     private[this] lazy val eventHandlerRegistry = new Registry
     private[this] lazy val asyncEventHandlerRegistry = new Registry
@@ -96,6 +102,11 @@ object EventBus extends Logging {
       asyncEventHandlerRegistry.register(et)
       this
     }
+
+    override def deregisterAll(): Unit = {
+      eventHandlerRegistry.deregisterAll()
+      asyncEventHandlerRegistry.deregisterAll()
+    }
   }
 
   private class Registry {
@@ -122,5 +133,10 @@ object EventBus extends Logging {
       } yield parent
       clazz :: parents
     }
+
+    def deregisterAll(): Unit = {
+      eventHandlers.values.flatten.foreach(_.close())
+      eventHandlers.clear()
+    }
   }
 }
diff --git 
a/kyuubi-events/src/main/scala/org/apache/kyuubi/events/handler/JsonLoggingEventHandler.scala
 
b/kyuubi-events/src/main/scala/org/apache/kyuubi/events/handler/JsonLoggingEventHandler.scala
index f6f74de9a..77d80b152 100644
--- 
a/kyuubi-events/src/main/scala/org/apache/kyuubi/events/handler/JsonLoggingEventHandler.scala
+++ 
b/kyuubi-events/src/main/scala/org/apache/kyuubi/events/handler/JsonLoggingEventHandler.scala
@@ -65,6 +65,17 @@ class JsonLoggingEventHandler(
     stream.foreach(_.hflush())
   }
 
+  override def close(): Unit = {
+    writers.values.foreach { case (writer, stream) =>
+      writer.flush()
+      stream.foreach(_.hflush())
+      writer.close()
+      stream.foreach(_.close())
+    }
+    writers.clear()
+    fs = null
+  }
+
   private def getOrUpdate(event: KyuubiEvent): Logger = synchronized {
     val partitions = event.partitions.map(kv => 
s"${kv._1}=${kv._2}").mkString(Path.SEPARATOR)
     writers.getOrElseUpdate(
@@ -108,6 +119,7 @@ class JsonLoggingEventHandler(
 }
 
 object JsonLoggingEventHandler {
-  val JSON_LOG_DIR_PERM: FsPermission = new 
FsPermission(Integer.parseInt("770", 8).toShort)
-  val JSON_LOG_FILE_PERM: FsPermission = new 
FsPermission(Integer.parseInt("660", 8).toShort)
+  private val JSON_LOG_DIR_PERM: FsPermission = new 
FsPermission(Integer.parseInt("770", 8).toShort)
+  private val JSON_LOG_FILE_PERM: FsPermission =
+    new FsPermission(Integer.parseInt("660", 8).toShort)
 }
diff --git 
a/kyuubi-events/src/main/scala/org/apache/kyuubi/events/handler/package.scala 
b/kyuubi-events/src/main/scala/org/apache/kyuubi/events/handler/package.scala
index 41cf001ed..69e1fdcee 100644
--- 
a/kyuubi-events/src/main/scala/org/apache/kyuubi/events/handler/package.scala
+++ 
b/kyuubi-events/src/main/scala/org/apache/kyuubi/events/handler/package.scala
@@ -18,5 +18,9 @@
 package org.apache.kyuubi.events
 
 package object handler {
-  type EventHandler[T <: KyuubiEvent] = T => Unit
+  trait EventHandler[T <: KyuubiEvent] extends AutoCloseable {
+    def apply(event: T): Unit
+
+    def close(): Unit = {}
+  }
 }
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala
index a7f2e8178..8bcd8d084 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala
@@ -193,5 +193,7 @@ class KyuubiServer(name: String) extends Serverable(name) {
     ServerEventHandlerRegister.registerEventLoggers(conf)
   }
 
-  override protected def stopServer(): Unit = {}
+  override protected def stopServer(): Unit = {
+    EventBus.deregisterAll()
+  }
 }
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/events/handler/ServerJsonLoggingEventHandlerSuite.scala
 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/events/handler/ServerJsonLoggingEventHandlerSuite.scala
index 3bdc9cd38..7c79d6a87 100644
--- 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/events/handler/ServerJsonLoggingEventHandlerSuite.scala
+++ 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/events/handler/ServerJsonLoggingEventHandlerSuite.scala
@@ -33,6 +33,7 @@ import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
 import org.apache.kyuubi._
 import org.apache.kyuubi.client.util.BatchUtils._
 import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.events.ServerEventHandlerRegister
 import org.apache.kyuubi.operation.HiveJDBCTestHelper
 import org.apache.kyuubi.operation.OperationState._
 import org.apache.kyuubi.server.KyuubiServer
@@ -197,6 +198,7 @@ class ServerJsonLoggingEventHandlerSuite extends 
WithKyuubiServer with HiveJDBCT
     server.initialize(conf)
     server.start()
     server.stop()
+    ServerEventHandlerRegister.registerEventLoggers(conf) // register event 
loggers again
 
     val hostName = InetAddress.getLocalHost.getCanonicalHostName
     val kyuubiServerInfoPath =

Reply via email to