This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 7faefb8  [KYUUBI #1979] Simplify Kyuubi Event
7faefb8 is described below

commit 7faefb8203d328366266f447c6a644b5925c5e35
Author: Kent Yao <[email protected]>
AuthorDate: Mon Feb 28 21:23:09 2022 +0800

    [KYUUBI #1979] Simplify Kyuubi Event
    
    ### _Why are the changes needed?_
    
    remove dead and duplicated codes for kyuubi events
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run 
test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests)
 locally before make a pull request
    
    Closes #1979 from yaooqinn/branch-events.
    
    Closes #1979
    
    213bccd5 [Kent Yao] Simplify Events Code
    1f20a1c9 [Kent Yao] Simplify Events Code
    97a04dcd [Kent Yao] Simplify Events Code
    f326bb0d [Kent Yao] Simplify Events Code
    47c6b8df [Kent Yao] Simplify Events Code
    b18f19ba [Kent Yao] Simplify Events Code
    788b9e4e [Kent Yao] Simplify Events Code
    
    Authored-by: Kent Yao <[email protected]>
    Signed-off-by: Kent Yao <[email protected]>
---
 .../kyuubi/engine/spark/SparkSQLEngine.scala       |  8 +++---
 .../kyuubi/engine/spark/events/EngineEvent.scala   |  7 ++---
 .../engine/spark/events/EventLoggingService.scala  | 26 ++----------------
 .../engine/spark/events/KyuubiSparkEvent.scala     | 29 --------------------
 .../kyuubi/engine/spark/events/SessionEvent.scala  |  7 ++---
 .../engine/spark/events/SparkOperationEvent.scala  |  7 ++---
 .../engine/spark/operation/ExecuteStatement.scala  |  7 +++--
 .../engine/spark/session/SparkSessionImpl.scala    |  7 +++--
 .../apache/spark/kyuubi/SparkContextHelper.scala   | 16 ++++++-----
 .../spark/events/EventLoggingServiceSuite.scala    |  4 +--
 .../events/AbstractEventLoggingService.scala       | 31 ++++++++++++++++++---
 .../org/apache/kyuubi/events/EventLogger.scala     |  4 +--
 .../org/apache/kyuubi/events/JsonEventLogger.scala |  8 +++---
 .../kyuubi/events/KyuubiOperationEvent.scala       |  2 +-
 .../apache/kyuubi/events/KyuubiServerEvent.scala   | 20 --------------
 .../kyuubi/events/KyuubiServerInfoEvent.scala      |  2 +-
 .../apache/kyuubi/events/KyuubiSessionEvent.scala  |  2 +-
 .../apache/kyuubi/operation/ExecuteStatement.scala |  7 ++---
 .../apache/kyuubi/server/EventLoggingService.scala | 32 +++-------------------
 .../org/apache/kyuubi/server/KyuubiServer.scala    |  8 +++---
 .../apache/kyuubi/session/KyuubiSessionImpl.scala  |  9 +++---
 21 files changed, 85 insertions(+), 158 deletions(-)

diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
index f884fec..b798cf9 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
@@ -24,7 +24,6 @@ import scala.util.control.NonFatal
 
 import org.apache.spark.{ui, SparkConf}
 import org.apache.spark.kyuubi.{SparkContextHelper, 
SparkSQLEngineEventListener, SparkSQLEngineListener}
-import org.apache.spark.kyuubi.SparkSQLEngineListener
 import org.apache.spark.kyuubi.SparkUtilsHelper.getLocalDir
 import org.apache.spark.sql.SparkSession
 
@@ -34,6 +33,7 @@ import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.config.KyuubiConf._
 import org.apache.kyuubi.engine.spark.SparkSQLEngine.{countDownLatch, 
currentEngine}
 import org.apache.kyuubi.engine.spark.events.{EngineEvent, EngineEventsStore, 
EventLoggingService}
+import org.apache.kyuubi.events.EventLogging
 import org.apache.kyuubi.ha.HighAvailabilityConf._
 import org.apache.kyuubi.ha.client.RetryPolicies
 import org.apache.kyuubi.service.Serverable
@@ -139,7 +139,7 @@ object SparkSQLEngine extends Logging {
 
       try {
         engine.initialize(kyuubiConf)
-        EventLoggingService.onEvent(EngineEvent(engine))
+        EventLogging.onEvent(EngineEvent(engine))
       } catch {
         case t: Throwable =>
           throw new KyuubiException(s"Failed to initialize SparkSQLEngine: 
${t.getMessage}", t)
@@ -155,7 +155,7 @@ object SparkSQLEngine extends Logging {
           kyuubiConf)
         val event = EngineEvent(engine)
         info(event)
-        EventLoggingService.onEvent(event)
+        EventLogging.onEvent(event)
       } catch {
         case t: Throwable =>
           throw new KyuubiException(s"Failed to start SparkSQLEngine: 
${t.getMessage}", t)
@@ -180,7 +180,7 @@ object SparkSQLEngine extends Logging {
             case Some(engine) =>
               engine.stop()
               val event = EngineEvent(engine).copy(diagnostic = e.getMessage)
-              EventLoggingService.onEvent(event)
+              EventLogging.onEvent(event)
               error(event, e)
             case _ => error("Current SparkSQLEngine is not created.")
           }
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EngineEvent.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EngineEvent.scala
index 2f8eb11..da8faa3 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EngineEvent.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EngineEvent.scala
@@ -19,12 +19,12 @@ package org.apache.kyuubi.engine.spark.events
 
 import java.util.Date
 
-import org.apache.spark.sql.Encoders
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.scheduler.SparkListenerEvent
 
 import org.apache.kyuubi.Utils
 import org.apache.kyuubi.config.KyuubiConf._
 import org.apache.kyuubi.engine.spark.SparkSQLEngine
+import org.apache.kyuubi.events.KyuubiEvent
 import org.apache.kyuubi.service.ServiceState
 
 /**
@@ -56,9 +56,8 @@ case class EngineEvent(
     endTime: Long,
     state: Int,
     diagnostic: String,
-    settings: Map[String, String]) extends KyuubiSparkEvent {
+    settings: Map[String, String]) extends KyuubiEvent with SparkListenerEvent 
{
 
-  override def schema: StructType = Encoders.product[EngineEvent].schema
   override lazy val partitions: Seq[(String, String)] =
     ("day", Utils.getDateFromTimestamp(startTime)) :: Nil
 
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EventLoggingService.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EventLoggingService.scala
index c015b45..249b8e9 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EventLoggingService.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EventLoggingService.scala
@@ -22,11 +22,10 @@ import org.apache.spark.kyuubi.SparkContextHelper
 
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.config.KyuubiConf.{ENGINE_EVENT_JSON_LOG_PATH, 
ENGINE_EVENT_LOGGERS}
-import org.apache.kyuubi.engine.spark.events.EventLoggingService._service
 import org.apache.kyuubi.events.{AbstractEventLoggingService, EventLoggerType, 
JsonEventLogger}
 
 class EventLoggingService(spark: SparkContext)
-  extends AbstractEventLoggingService[KyuubiSparkEvent] {
+  extends AbstractEventLoggingService {
 
   override def initialize(conf: KyuubiConf): Unit = synchronized {
     conf.get(ENGINE_EVENT_LOGGERS)
@@ -35,7 +34,7 @@ class EventLoggingService(spark: SparkContext)
         case EventLoggerType.SPARK =>
           addEventLogger(SparkContextHelper.createSparkHistoryLogger(spark))
         case EventLoggerType.JSON =>
-          val jsonEventLogger = new JsonEventLogger[KyuubiSparkEvent](
+          val jsonEventLogger = new JsonEventLogger(
             spark.applicationAttemptId.getOrElse(spark.applicationId),
             ENGINE_EVENT_JSON_LOG_PATH,
             spark.hadoopConfiguration)
@@ -47,25 +46,4 @@ class EventLoggingService(spark: SparkContext)
       }
     super.initialize(conf)
   }
-
-  override def start(): Unit = synchronized {
-    super.start()
-    // expose the event logging service only when the loggers successfully 
start
-    _service = Some(this)
-  }
-
-  override def stop(): Unit = synchronized {
-    _service = None
-    super.stop()
-  }
-
-}
-
-object EventLoggingService {
-
-  private var _service: Option[EventLoggingService] = None
-
-  def onEvent(event: KyuubiSparkEvent): Unit = {
-    _service.foreach(_.onEvent(event))
-  }
 }
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/KyuubiSparkEvent.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/KyuubiSparkEvent.scala
deleted file mode 100644
index 4fb455f..0000000
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/KyuubiSparkEvent.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kyuubi.engine.spark.events
-
-import org.apache.spark.scheduler.SparkListenerEvent
-import org.apache.spark.sql.types.StructType
-
-import org.apache.kyuubi.events.KyuubiEvent
-
-trait KyuubiSparkEvent extends KyuubiEvent with SparkListenerEvent {
-
-  def schema: StructType
-
-}
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SessionEvent.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SessionEvent.scala
index 670d085..b67a676 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SessionEvent.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SessionEvent.scala
@@ -18,14 +18,14 @@
 package org.apache.kyuubi.engine.spark.events
 
 import com.fasterxml.jackson.annotation.JsonIgnore
-import org.apache.spark.sql.Encoders
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.scheduler.SparkListenerEvent
 import org.apache.spark.util.kvstore.KVIndex
 
 import org.apache.kyuubi.Utils
 import org.apache.kyuubi.engine.spark.KyuubiSparkUtil
 import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.KVIndexParam
 import org.apache.kyuubi.engine.spark.session.SparkSessionImpl
+import org.apache.kyuubi.events.KyuubiEvent
 
 /**
  * Event Tracking for user sessions
@@ -45,9 +45,8 @@ case class SessionEvent(
     serverIp: String,
     startTime: Long,
     var endTime: Long = -1L,
-    var totalOperations: Int = 0) extends KyuubiSparkEvent {
+    var totalOperations: Int = 0) extends KyuubiEvent with SparkListenerEvent {
 
-  override def schema: StructType = Encoders.product[SessionEvent].schema
   override lazy val partitions: Seq[(String, String)] =
     ("day", Utils.getDateFromTimestamp(startTime)) :: Nil
 
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SparkOperationEvent.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SparkOperationEvent.scala
index d496989..22eb8ac 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SparkOperationEvent.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SparkOperationEvent.scala
@@ -18,13 +18,13 @@
 package org.apache.kyuubi.engine.spark.events
 
 import com.fasterxml.jackson.annotation.JsonIgnore
-import org.apache.spark.sql.Encoders
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.scheduler.SparkListenerEvent
 import org.apache.spark.util.kvstore.KVIndex
 
 import org.apache.kyuubi.Utils
 import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.KVIndexParam
 import org.apache.kyuubi.engine.spark.operation.SparkOperation
+import org.apache.kyuubi.events.KyuubiEvent
 
 /**
  * A [[SparkOperationEvent]] used to tracker the lifecycle of an operation at 
Spark SQL Engine side.
@@ -59,9 +59,8 @@ case class SparkOperationEvent(
     exception: Option[Throwable],
     sessionId: String,
     sessionUser: String,
-    executionId: Option[Long]) extends KyuubiSparkEvent {
+    executionId: Option[Long]) extends KyuubiEvent with SparkListenerEvent {
 
-  override def schema: StructType = 
Encoders.product[SparkOperationEvent].schema
   override def partitions: Seq[(String, String)] =
     ("day", Utils.getDateFromTimestamp(createTime)) :: Nil
 
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
index 450ec4b..7db2387 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
@@ -27,7 +27,8 @@ import org.apache.spark.sql.types._
 
 import org.apache.kyuubi.{KyuubiSQLException, Logging}
 import org.apache.kyuubi.engine.spark.KyuubiSparkUtil._
-import org.apache.kyuubi.engine.spark.events.{EventLoggingService, 
SparkOperationEvent}
+import org.apache.kyuubi.engine.spark.events.SparkOperationEvent
+import org.apache.kyuubi.events.EventLogging
 import org.apache.kyuubi.operation.{ArrayFetchIterator, IterableFetchIterator, 
OperationState, OperationType}
 import org.apache.kyuubi.operation.OperationState.OperationState
 import org.apache.kyuubi.operation.log.OperationLog
@@ -49,7 +50,7 @@ class ExecuteStatement(
 
   private val operationListener: SQLOperationListener = new 
SQLOperationListener(this, spark)
 
-  EventLoggingService.onEvent(SparkOperationEvent(this))
+  EventLogging.onEvent(SparkOperationEvent(this))
 
   override protected def resultSchema: StructType = {
     if (result == null || result.schema.isEmpty) {
@@ -146,7 +147,7 @@ class ExecuteStatement(
 
   override def setState(newState: OperationState): Unit = {
     super.setState(newState)
-    EventLoggingService.onEvent(
+    EventLogging.onEvent(
       SparkOperationEvent(this, operationListener.getExecutionId))
   }
 }
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala
index 95d22ea..1dc59ee 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala
@@ -20,9 +20,10 @@ package org.apache.kyuubi.engine.spark.session
 import org.apache.hive.service.rpc.thrift.TProtocolVersion
 import org.apache.spark.sql.{AnalysisException, SparkSession}
 
-import org.apache.kyuubi.engine.spark.events.{EventLoggingService, 
SessionEvent}
+import org.apache.kyuubi.engine.spark.events.SessionEvent
 import org.apache.kyuubi.engine.spark.operation.SparkSQLOperationManager
 import org.apache.kyuubi.engine.spark.udf.KDFRegistry
+import org.apache.kyuubi.events.EventLogging
 import org.apache.kyuubi.operation.{Operation, OperationHandle}
 import org.apache.kyuubi.session.{AbstractSession, SessionHandle, 
SessionManager}
 
@@ -56,7 +57,7 @@ class SparkSessionImpl(
       case (key, value) => setModifiableConfig(key, value)
     }
     KDFRegistry.registerAll(spark)
-    EventLoggingService.onEvent(sessionEvent)
+    EventLogging.onEvent(sessionEvent)
     super.open()
   }
 
@@ -67,7 +68,7 @@ class SparkSessionImpl(
 
   override def close(): Unit = {
     sessionEvent.endTime = System.currentTimeMillis()
-    EventLoggingService.onEvent(sessionEvent)
+    EventLogging.onEvent(sessionEvent)
     super.close()
     
spark.sessionState.catalog.getTempViewNames().foreach(spark.catalog.uncacheTable(_))
     
sessionManager.operationManager.asInstanceOf[SparkSQLOperationManager].closeILoop(handle)
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkContextHelper.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkContextHelper.scala
index ba67151..802c0f6 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkContextHelper.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkContextHelper.scala
@@ -20,7 +20,7 @@ package org.apache.spark.kyuubi
 import org.apache.hadoop.security.Credentials
 import org.apache.spark.SparkContext
 import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.scheduler.SchedulerBackend
+import org.apache.spark.scheduler.{SchedulerBackend, SparkListenerEvent}
 import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
 import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
 import org.apache.spark.scheduler.local.LocalSchedulerBackend
@@ -29,8 +29,7 @@ import org.apache.spark.ui.SparkUI
 
 import org.apache.kyuubi.Logging
 import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_STATEMENT_ID_KEY
-import org.apache.kyuubi.engine.spark.events.KyuubiSparkEvent
-import org.apache.kyuubi.events.EventLogger
+import org.apache.kyuubi.events.{EventLogger, KyuubiEvent}
 
 /**
  * A place to invoke non-public APIs of [[SparkContext]], anything to be added 
here need to
@@ -38,7 +37,7 @@ import org.apache.kyuubi.events.EventLogger
  */
 object SparkContextHelper extends Logging {
 
-  def createSparkHistoryLogger(sc: SparkContext): 
EventLogger[KyuubiSparkEvent] = {
+  def createSparkHistoryLogger(sc: SparkContext): EventLogger = {
     new SparkHistoryEventLogger(sc)
   }
 
@@ -87,8 +86,11 @@ object SparkContextHelper extends Logging {
  * A [[EventLogger]] that logs everything to SparkHistory
  * @param sc SparkContext
  */
-private class SparkHistoryEventLogger(sc: SparkContext) extends 
EventLogger[KyuubiSparkEvent] {
-  override def logEvent(kyuubiEvent: KyuubiSparkEvent): Unit = {
-    sc.listenerBus.post(kyuubiEvent)
+private class SparkHistoryEventLogger(sc: SparkContext) extends EventLogger {
+  override def logEvent(kyuubiEvent: KyuubiEvent): Unit = {
+    kyuubiEvent match {
+      case s: SparkListenerEvent => sc.listenerBus.post(s)
+      case _ => // ignore
+    }
   }
 }
diff --git 
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/events/EventLoggingServiceSuite.scala
 
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/events/EventLoggingServiceSuite.scala
index 495ae59..007ff67 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/events/EventLoggingServiceSuite.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/events/EventLoggingServiceSuite.scala
@@ -62,8 +62,8 @@ class EventLoggingServiceSuite extends WithSparkSQLEngine 
with HiveJDBCTestHelpe
     val engineEventReader = new BufferedReader(new InputStreamReader(fs))
 
     val readEvent =
-      JsonProtocol.jsonToEvent(engineEventReader.readLine(), 
classOf[KyuubiSparkEvent])
-    assert(readEvent.isInstanceOf[KyuubiSparkEvent])
+      JsonProtocol.jsonToEvent(engineEventReader.readLine(), 
classOf[EngineEvent])
+    assert(readEvent.isInstanceOf[EngineEvent])
 
     withJdbcStatement() { statement =>
       val table = engineEventPath.getParent
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/events/AbstractEventLoggingService.scala
 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/events/AbstractEventLoggingService.scala
index 08ad39f..248d3c7 100644
--- 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/events/AbstractEventLoggingService.scala
+++ 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/events/AbstractEventLoggingService.scala
@@ -21,16 +21,39 @@ import scala.collection.mutable.ArrayBuffer
 
 import org.apache.kyuubi.service.CompositeService
 
-abstract class AbstractEventLoggingService[T <: KyuubiEvent]
+abstract class AbstractEventLoggingService
   extends CompositeService("EventLogging") {
 
-  private val eventLoggers = new ArrayBuffer[EventLogger[T]]()
+  import EventLogging._
 
-  def onEvent(event: T): Unit = {
+  private val eventLoggers = new ArrayBuffer[EventLogger]()
+
+  def onEvent(event: KyuubiEvent): Unit = {
     eventLoggers.foreach(_.logEvent(event))
   }
 
-  def addEventLogger(logger: EventLogger[T]): Unit = {
+  def addEventLogger(logger: EventLogger): Unit = {
     eventLoggers += logger
   }
+
+  override def start(): Unit = synchronized {
+    super.start()
+    // expose the event logging service only when the loggers successfully 
start
+    _service = Some(this)
+  }
+
+  override def stop(): Unit = synchronized {
+    _service = None
+    super.stop()
+  }
+
+}
+
+object EventLogging {
+
+  private[events] var _service: Option[AbstractEventLoggingService] = None
+
+  def onEvent(event: KyuubiEvent): Unit = {
+    _service.foreach(_.onEvent(event))
+  }
 }
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/events/EventLogger.scala 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/events/EventLogger.scala
index e56136e..875fc0b 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/events/EventLogger.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/events/EventLogger.scala
@@ -17,8 +17,8 @@
 
 package org.apache.kyuubi.events
 
-trait EventLogger[T <: KyuubiEvent] {
+trait EventLogger {
 
-  def logEvent(kyuubiEvent: T): Unit
+  def logEvent(kyuubiEvent: KyuubiEvent): Unit
 
 }
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/events/JsonEventLogger.scala 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/events/JsonEventLogger.scala
index f85f28c..9537a7e 100644
--- 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/events/JsonEventLogger.scala
+++ 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/events/JsonEventLogger.scala
@@ -41,11 +41,11 @@ import org.apache.kyuubi.service.AbstractService
  * The ${date} is based on the time of events, e.g. engine.startTime, 
statement.startTime
  * @param logName the engine id formed of appId + attemptId(if any)
  */
-class JsonEventLogger[T <: KyuubiEvent](
+class JsonEventLogger(
     logName: String,
     logPath: ConfigEntry[String],
     hadoopConf: Configuration)
-  extends AbstractService("JsonEventLogger") with EventLogger[T] with Logging {
+  extends AbstractService("JsonEventLogger") with EventLogger with Logging {
 
   type Logger = (PrintWriter, Option[FSDataOutputStream])
 
@@ -105,7 +105,7 @@ class JsonEventLogger[T <: KyuubiEvent](
     super.stop()
   }
 
-  override def logEvent(kyuubiEvent: T): Unit = {
+  override def logEvent(kyuubiEvent: KyuubiEvent): Unit = {
     val (writer, stream) = getOrUpdate(kyuubiEvent)
     // scalastyle:off println
     writer.println(kyuubiEvent.toJson)
@@ -119,7 +119,7 @@ class JsonEventLogger[T <: KyuubiEvent](
     val logRoot: URI = URI.create(conf.get(ENGINE_EVENT_JSON_LOG_PATH))
     val fs: FileSystem = FileSystem.get(logRoot, hadoopConf)
     val success: Boolean = FileSystem.mkdirs(fs, new Path(logRoot), 
JSON_LOG_DIR_PERM)
-    if (success == false) {
+    if (!success) {
       val fileStatus = fs.getFileStatus(new Path(logRoot))
       if (!fileStatus.isDirectory) {
         throw new IllegalArgumentException(s"Log directory $logRoot is not a 
directory.")
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiOperationEvent.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiOperationEvent.scala
index 0ecb732..34c2360 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiOperationEvent.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiOperationEvent.scala
@@ -53,7 +53,7 @@ case class KyuubiOperationEvent private (
     completeTime: Long,
     exception: Option[Throwable],
     sessionId: String,
-    sessionUser: String) extends KyuubiServerEvent {
+    sessionUser: String) extends KyuubiEvent {
 
   // operation events are partitioned by the date when the corresponding 
operations are
   // created.
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiServerEvent.scala 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiServerEvent.scala
deleted file mode 100644
index aee2ed5..0000000
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiServerEvent.scala
+++ /dev/null
@@ -1,20 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kyuubi.events
-
-trait KyuubiServerEvent extends KyuubiEvent {}
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiServerInfoEvent.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiServerInfoEvent.scala
index 18af357..95d826b 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiServerInfoEvent.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiServerInfoEvent.scala
@@ -39,7 +39,7 @@ case class KyuubiServerInfoEvent private (
     state: String,
     serverIP: String,
     serverConf: Map[String, String],
-    serverEnv: Map[String, String]) extends KyuubiServerEvent {
+    serverEnv: Map[String, String]) extends KyuubiEvent {
 
   val BUILD_USER: String = P_BUILD_USER
   val BUILD_DATE: String = P_BUILD_DATE
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiSessionEvent.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiSessionEvent.scala
index 9196c4d..a56b895 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiSessionEvent.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiSessionEvent.scala
@@ -51,7 +51,7 @@ case class KyuubiSessionEvent(
     var engineId: String = "",
     var openedTime: Long = -1L,
     var endTime: Long = -1L,
-    var totalOperations: Int = 0) extends KyuubiServerEvent {
+    var totalOperations: Int = 0) extends KyuubiEvent {
   override def partitions: Seq[(String, String)] =
     ("day", Utils.getDateFromTimestamp(startTime)) :: Nil
 }
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
index 3a9b265..74b8656 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
@@ -25,11 +25,10 @@ import org.apache.thrift.TException
 
 import org.apache.kyuubi.KyuubiSQLException
 import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.events.KyuubiOperationEvent
+import org.apache.kyuubi.events.{EventLogging, KyuubiOperationEvent}
 import org.apache.kyuubi.operation.FetchOrientation.FETCH_NEXT
 import org.apache.kyuubi.operation.OperationState.OperationState
 import org.apache.kyuubi.operation.log.OperationLog
-import org.apache.kyuubi.server.EventLoggingService
 import org.apache.kyuubi.session.{KyuubiSessionImpl, KyuubiSessionManager, 
Session}
 
 class ExecuteStatement(
@@ -39,7 +38,7 @@ class ExecuteStatement(
     override val shouldRunAsync: Boolean,
     queryTimeout: Long)
   extends KyuubiOperation(OperationType.EXECUTE_STATEMENT, session) {
-  EventLoggingService.onEvent(KyuubiOperationEvent(this))
+  EventLogging.onEvent(KyuubiOperationEvent(this))
 
   final private val _operationLog: OperationLog =
     if (shouldRunAsync) {
@@ -175,7 +174,7 @@ class ExecuteStatement(
 
   override def setState(newState: OperationState): Unit = {
     super.setState(newState)
-    EventLoggingService.onEvent(KyuubiOperationEvent(this))
+    EventLogging.onEvent(KyuubiOperationEvent(this))
   }
 
   override def close(): Unit = {
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/EventLoggingService.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/EventLoggingService.scala
index ee7e65e..adb3e12 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/EventLoggingService.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/EventLoggingService.scala
@@ -20,15 +20,11 @@ package org.apache.kyuubi.server
 import java.net.InetAddress
 
 import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.config.KyuubiConf.SERVER_EVENT_JSON_LOG_PATH
-import org.apache.kyuubi.config.KyuubiConf.SERVER_EVENT_LOGGERS
-import org.apache.kyuubi.events.{AbstractEventLoggingService, EventLoggerType}
-import org.apache.kyuubi.events.JsonEventLogger
-import org.apache.kyuubi.events.KyuubiServerEvent
-import org.apache.kyuubi.server.EventLoggingService._service
+import org.apache.kyuubi.config.KyuubiConf.{SERVER_EVENT_JSON_LOG_PATH, 
SERVER_EVENT_LOGGERS}
+import org.apache.kyuubi.events.{AbstractEventLoggingService, EventLoggerType, 
JsonEventLogger}
 import org.apache.kyuubi.util.KyuubiHadoopUtils
 
-class EventLoggingService extends 
AbstractEventLoggingService[KyuubiServerEvent] {
+class EventLoggingService extends AbstractEventLoggingService {
 
   override def initialize(conf: KyuubiConf): Unit = {
     val hadoopConf = KyuubiHadoopUtils.newHadoopConf(conf)
@@ -37,7 +33,7 @@ class EventLoggingService extends 
AbstractEventLoggingService[KyuubiServerEvent]
       .foreach {
         case EventLoggerType.JSON =>
           val hostName = InetAddress.getLocalHost.getCanonicalHostName
-          val jsonEventLogger = new JsonEventLogger[KyuubiServerEvent](
+          val jsonEventLogger = new JsonEventLogger(
             s"server-$hostName",
             SERVER_EVENT_JSON_LOG_PATH,
             hadoopConf)
@@ -51,24 +47,4 @@ class EventLoggingService extends 
AbstractEventLoggingService[KyuubiServerEvent]
       }
     super.initialize(conf)
   }
-
-  override def start(): Unit = {
-    _service = Some(this)
-    super.start()
-  }
-
-  override def stop(): Unit = {
-    _service = None
-    super.stop()
-  }
-
-}
-
-object EventLoggingService {
-
-  private var _service: Option[EventLoggingService] = None
-
-  def onEvent(event: KyuubiServerEvent): Unit = {
-    _service.foreach(_.onEvent(event))
-  }
 }
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala
index 003112b..3f6e34e 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala
@@ -31,7 +31,7 @@ import org.apache.kyuubi._
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.config.KyuubiConf.{FRONTEND_PROTOCOLS, 
FrontendProtocols}
 import org.apache.kyuubi.config.KyuubiConf.FrontendProtocols._
-import org.apache.kyuubi.events.KyuubiServerInfoEvent
+import org.apache.kyuubi.events.{EventLogging, KyuubiServerInfoEvent}
 import org.apache.kyuubi.ha.HighAvailabilityConf._
 import org.apache.kyuubi.ha.client.{ServiceDiscovery, ZooKeeperAuthTypes}
 import org.apache.kyuubi.ha.client.ZooKeeperClientProvider._
@@ -154,7 +154,7 @@ class KyuubiServer(name: String) extends Serverable(name) {
         throw new UnsupportedOperationException(s"Frontend protocol $other is 
not supported yet.")
     }
 
-  private val eventLoggingService: EventLoggingService = new 
EventLoggingService
+  private val eventLoggingService = new EventLoggingService()
 
   override def initialize(conf: KyuubiConf): Unit = synchronized {
     val kinit = new KinitAuxiliaryService()
@@ -170,11 +170,11 @@ class KyuubiServer(name: String) extends Serverable(name) 
{
   override def start(): Unit = {
     super.start()
     KyuubiServer.kyuubiServer = this
-    KyuubiServerInfoEvent(this, 
ServiceState.STARTED).foreach(EventLoggingService.onEvent)
+    KyuubiServerInfoEvent(this, 
ServiceState.STARTED).foreach(EventLogging.onEvent)
   }
 
   override def stop(): Unit = {
-    KyuubiServerInfoEvent(this, 
ServiceState.STOPPED).foreach(EventLoggingService.onEvent)
+    KyuubiServerInfoEvent(this, 
ServiceState.STOPPED).foreach(EventLogging.onEvent)
     super.stop()
   }
 
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
index 3ffe713..a460302 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
@@ -27,13 +27,12 @@ import org.apache.kyuubi.client.KyuubiSyncThriftClient
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.config.KyuubiConf._
 import org.apache.kyuubi.engine.EngineRef
-import org.apache.kyuubi.events.{KyuubiEvent, KyuubiSessionEvent}
+import org.apache.kyuubi.events.{EventLogging, KyuubiEvent, KyuubiSessionEvent}
 import org.apache.kyuubi.ha.client.ZooKeeperClientProvider._
 import org.apache.kyuubi.metrics.MetricsConstants._
 import org.apache.kyuubi.metrics.MetricsSystem
 import org.apache.kyuubi.operation.{Operation, OperationHandle}
 import org.apache.kyuubi.operation.log.OperationLog
-import org.apache.kyuubi.server.EventLoggingService
 import org.apache.kyuubi.service.authentication.EngineSecurityAccessor
 
 class KyuubiSessionImpl(
@@ -71,7 +70,7 @@ class KyuubiSessionImpl(
     .newLaunchEngineOperation(this, 
sessionConf.get(SESSION_ENGINE_LAUNCH_ASYNC))
 
   private val sessionEvent = KyuubiSessionEvent(this)
-  EventLoggingService.onEvent(sessionEvent)
+  EventLogging.onEvent(sessionEvent)
 
   override def getSessionEvent: Option[KyuubiEvent] = {
     Option(sessionEvent)
@@ -109,7 +108,7 @@ class KyuubiSessionImpl(
       sessionEvent.openedTime = System.currentTimeMillis()
       sessionEvent.remoteSessionId = _engineSessionHandle.identifier.toString
       _client.engineId.foreach(e => sessionEvent.engineId = e)
-      EventLoggingService.onEvent(sessionEvent)
+      EventLogging.onEvent(sessionEvent)
     }
   }
 
@@ -155,7 +154,7 @@ class KyuubiSessionImpl(
       if (_client != null) _client.closeSession()
     } finally {
       sessionEvent.endTime = System.currentTimeMillis()
-      EventLoggingService.onEvent(sessionEvent)
+      EventLogging.onEvent(sessionEvent)
       MetricsSystem.tracing(_.decCount(MetricRegistry.name(CONN_OPEN, user)))
     }
   }

Reply via email to