This is an automated email from the ASF dual-hosted git repository.
yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 7faefb8 [KYUUBI #1979] Simplify Kyuubi Event
7faefb8 is described below
commit 7faefb8203d328366266f447c6a644b5925c5e35
Author: Kent Yao <[email protected]>
AuthorDate: Mon Feb 28 21:23:09 2022 +0800
[KYUUBI #1979] Simplify Kyuubi Event
### _Why are the changes needed?_
remove dead and duplicated codes for kyuubi events
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run
test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #1979 from yaooqinn/branch-events.
Closes #1979
213bccd5 [Kent Yao] Simplify Events Code
1f20a1c9 [Kent Yao] Simplify Events Code
97a04dcd [Kent Yao] Simplify Events Code
f326bb0d [Kent Yao] Simplify Events Code
47c6b8df [Kent Yao] Simplify Events Code
b18f19ba [Kent Yao] Simplify Events Code
788b9e4e [Kent Yao] Simplify Events Code
Authored-by: Kent Yao <[email protected]>
Signed-off-by: Kent Yao <[email protected]>
---
.../kyuubi/engine/spark/SparkSQLEngine.scala | 8 +++---
.../kyuubi/engine/spark/events/EngineEvent.scala | 7 ++---
.../engine/spark/events/EventLoggingService.scala | 26 ++----------------
.../engine/spark/events/KyuubiSparkEvent.scala | 29 --------------------
.../kyuubi/engine/spark/events/SessionEvent.scala | 7 ++---
.../engine/spark/events/SparkOperationEvent.scala | 7 ++---
.../engine/spark/operation/ExecuteStatement.scala | 7 +++--
.../engine/spark/session/SparkSessionImpl.scala | 7 +++--
.../apache/spark/kyuubi/SparkContextHelper.scala | 16 ++++++-----
.../spark/events/EventLoggingServiceSuite.scala | 4 +--
.../events/AbstractEventLoggingService.scala | 31 ++++++++++++++++++---
.../org/apache/kyuubi/events/EventLogger.scala | 4 +--
.../org/apache/kyuubi/events/JsonEventLogger.scala | 8 +++---
.../kyuubi/events/KyuubiOperationEvent.scala | 2 +-
.../apache/kyuubi/events/KyuubiServerEvent.scala | 20 --------------
.../kyuubi/events/KyuubiServerInfoEvent.scala | 2 +-
.../apache/kyuubi/events/KyuubiSessionEvent.scala | 2 +-
.../apache/kyuubi/operation/ExecuteStatement.scala | 7 ++---
.../apache/kyuubi/server/EventLoggingService.scala | 32 +++-------------------
.../org/apache/kyuubi/server/KyuubiServer.scala | 8 +++---
.../apache/kyuubi/session/KyuubiSessionImpl.scala | 9 +++---
21 files changed, 85 insertions(+), 158 deletions(-)
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
index f884fec..b798cf9 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
@@ -24,7 +24,6 @@ import scala.util.control.NonFatal
import org.apache.spark.{ui, SparkConf}
import org.apache.spark.kyuubi.{SparkContextHelper,
SparkSQLEngineEventListener, SparkSQLEngineListener}
-import org.apache.spark.kyuubi.SparkSQLEngineListener
import org.apache.spark.kyuubi.SparkUtilsHelper.getLocalDir
import org.apache.spark.sql.SparkSession
@@ -34,6 +33,7 @@ import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.engine.spark.SparkSQLEngine.{countDownLatch,
currentEngine}
import org.apache.kyuubi.engine.spark.events.{EngineEvent, EngineEventsStore,
EventLoggingService}
+import org.apache.kyuubi.events.EventLogging
import org.apache.kyuubi.ha.HighAvailabilityConf._
import org.apache.kyuubi.ha.client.RetryPolicies
import org.apache.kyuubi.service.Serverable
@@ -139,7 +139,7 @@ object SparkSQLEngine extends Logging {
try {
engine.initialize(kyuubiConf)
- EventLoggingService.onEvent(EngineEvent(engine))
+ EventLogging.onEvent(EngineEvent(engine))
} catch {
case t: Throwable =>
throw new KyuubiException(s"Failed to initialize SparkSQLEngine:
${t.getMessage}", t)
@@ -155,7 +155,7 @@ object SparkSQLEngine extends Logging {
kyuubiConf)
val event = EngineEvent(engine)
info(event)
- EventLoggingService.onEvent(event)
+ EventLogging.onEvent(event)
} catch {
case t: Throwable =>
throw new KyuubiException(s"Failed to start SparkSQLEngine:
${t.getMessage}", t)
@@ -180,7 +180,7 @@ object SparkSQLEngine extends Logging {
case Some(engine) =>
engine.stop()
val event = EngineEvent(engine).copy(diagnostic = e.getMessage)
- EventLoggingService.onEvent(event)
+ EventLogging.onEvent(event)
error(event, e)
case _ => error("Current SparkSQLEngine is not created.")
}
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EngineEvent.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EngineEvent.scala
index 2f8eb11..da8faa3 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EngineEvent.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EngineEvent.scala
@@ -19,12 +19,12 @@ package org.apache.kyuubi.engine.spark.events
import java.util.Date
-import org.apache.spark.sql.Encoders
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.scheduler.SparkListenerEvent
import org.apache.kyuubi.Utils
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.engine.spark.SparkSQLEngine
+import org.apache.kyuubi.events.KyuubiEvent
import org.apache.kyuubi.service.ServiceState
/**
@@ -56,9 +56,8 @@ case class EngineEvent(
endTime: Long,
state: Int,
diagnostic: String,
- settings: Map[String, String]) extends KyuubiSparkEvent {
+ settings: Map[String, String]) extends KyuubiEvent with SparkListenerEvent
{
- override def schema: StructType = Encoders.product[EngineEvent].schema
override lazy val partitions: Seq[(String, String)] =
("day", Utils.getDateFromTimestamp(startTime)) :: Nil
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EventLoggingService.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EventLoggingService.scala
index c015b45..249b8e9 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EventLoggingService.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EventLoggingService.scala
@@ -22,11 +22,10 @@ import org.apache.spark.kyuubi.SparkContextHelper
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.{ENGINE_EVENT_JSON_LOG_PATH,
ENGINE_EVENT_LOGGERS}
-import org.apache.kyuubi.engine.spark.events.EventLoggingService._service
import org.apache.kyuubi.events.{AbstractEventLoggingService, EventLoggerType,
JsonEventLogger}
class EventLoggingService(spark: SparkContext)
- extends AbstractEventLoggingService[KyuubiSparkEvent] {
+ extends AbstractEventLoggingService {
override def initialize(conf: KyuubiConf): Unit = synchronized {
conf.get(ENGINE_EVENT_LOGGERS)
@@ -35,7 +34,7 @@ class EventLoggingService(spark: SparkContext)
case EventLoggerType.SPARK =>
addEventLogger(SparkContextHelper.createSparkHistoryLogger(spark))
case EventLoggerType.JSON =>
- val jsonEventLogger = new JsonEventLogger[KyuubiSparkEvent](
+ val jsonEventLogger = new JsonEventLogger(
spark.applicationAttemptId.getOrElse(spark.applicationId),
ENGINE_EVENT_JSON_LOG_PATH,
spark.hadoopConfiguration)
@@ -47,25 +46,4 @@ class EventLoggingService(spark: SparkContext)
}
super.initialize(conf)
}
-
- override def start(): Unit = synchronized {
- super.start()
- // expose the event logging service only when the loggers successfully
start
- _service = Some(this)
- }
-
- override def stop(): Unit = synchronized {
- _service = None
- super.stop()
- }
-
-}
-
-object EventLoggingService {
-
- private var _service: Option[EventLoggingService] = None
-
- def onEvent(event: KyuubiSparkEvent): Unit = {
- _service.foreach(_.onEvent(event))
- }
}
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/KyuubiSparkEvent.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/KyuubiSparkEvent.scala
deleted file mode 100644
index 4fb455f..0000000
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/KyuubiSparkEvent.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kyuubi.engine.spark.events
-
-import org.apache.spark.scheduler.SparkListenerEvent
-import org.apache.spark.sql.types.StructType
-
-import org.apache.kyuubi.events.KyuubiEvent
-
-trait KyuubiSparkEvent extends KyuubiEvent with SparkListenerEvent {
-
- def schema: StructType
-
-}
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SessionEvent.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SessionEvent.scala
index 670d085..b67a676 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SessionEvent.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SessionEvent.scala
@@ -18,14 +18,14 @@
package org.apache.kyuubi.engine.spark.events
import com.fasterxml.jackson.annotation.JsonIgnore
-import org.apache.spark.sql.Encoders
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.scheduler.SparkListenerEvent
import org.apache.spark.util.kvstore.KVIndex
import org.apache.kyuubi.Utils
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.KVIndexParam
import org.apache.kyuubi.engine.spark.session.SparkSessionImpl
+import org.apache.kyuubi.events.KyuubiEvent
/**
* Event Tracking for user sessions
@@ -45,9 +45,8 @@ case class SessionEvent(
serverIp: String,
startTime: Long,
var endTime: Long = -1L,
- var totalOperations: Int = 0) extends KyuubiSparkEvent {
+ var totalOperations: Int = 0) extends KyuubiEvent with SparkListenerEvent {
- override def schema: StructType = Encoders.product[SessionEvent].schema
override lazy val partitions: Seq[(String, String)] =
("day", Utils.getDateFromTimestamp(startTime)) :: Nil
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SparkOperationEvent.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SparkOperationEvent.scala
index d496989..22eb8ac 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SparkOperationEvent.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SparkOperationEvent.scala
@@ -18,13 +18,13 @@
package org.apache.kyuubi.engine.spark.events
import com.fasterxml.jackson.annotation.JsonIgnore
-import org.apache.spark.sql.Encoders
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.scheduler.SparkListenerEvent
import org.apache.spark.util.kvstore.KVIndex
import org.apache.kyuubi.Utils
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.KVIndexParam
import org.apache.kyuubi.engine.spark.operation.SparkOperation
+import org.apache.kyuubi.events.KyuubiEvent
/**
* A [[SparkOperationEvent]] used to tracker the lifecycle of an operation at
Spark SQL Engine side.
@@ -59,9 +59,8 @@ case class SparkOperationEvent(
exception: Option[Throwable],
sessionId: String,
sessionUser: String,
- executionId: Option[Long]) extends KyuubiSparkEvent {
+ executionId: Option[Long]) extends KyuubiEvent with SparkListenerEvent {
- override def schema: StructType =
Encoders.product[SparkOperationEvent].schema
override def partitions: Seq[(String, String)] =
("day", Utils.getDateFromTimestamp(createTime)) :: Nil
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
index 450ec4b..7db2387 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
@@ -27,7 +27,8 @@ import org.apache.spark.sql.types._
import org.apache.kyuubi.{KyuubiSQLException, Logging}
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil._
-import org.apache.kyuubi.engine.spark.events.{EventLoggingService,
SparkOperationEvent}
+import org.apache.kyuubi.engine.spark.events.SparkOperationEvent
+import org.apache.kyuubi.events.EventLogging
import org.apache.kyuubi.operation.{ArrayFetchIterator, IterableFetchIterator,
OperationState, OperationType}
import org.apache.kyuubi.operation.OperationState.OperationState
import org.apache.kyuubi.operation.log.OperationLog
@@ -49,7 +50,7 @@ class ExecuteStatement(
private val operationListener: SQLOperationListener = new
SQLOperationListener(this, spark)
- EventLoggingService.onEvent(SparkOperationEvent(this))
+ EventLogging.onEvent(SparkOperationEvent(this))
override protected def resultSchema: StructType = {
if (result == null || result.schema.isEmpty) {
@@ -146,7 +147,7 @@ class ExecuteStatement(
override def setState(newState: OperationState): Unit = {
super.setState(newState)
- EventLoggingService.onEvent(
+ EventLogging.onEvent(
SparkOperationEvent(this, operationListener.getExecutionId))
}
}
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala
index 95d22ea..1dc59ee 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala
@@ -20,9 +20,10 @@ package org.apache.kyuubi.engine.spark.session
import org.apache.hive.service.rpc.thrift.TProtocolVersion
import org.apache.spark.sql.{AnalysisException, SparkSession}
-import org.apache.kyuubi.engine.spark.events.{EventLoggingService,
SessionEvent}
+import org.apache.kyuubi.engine.spark.events.SessionEvent
import org.apache.kyuubi.engine.spark.operation.SparkSQLOperationManager
import org.apache.kyuubi.engine.spark.udf.KDFRegistry
+import org.apache.kyuubi.events.EventLogging
import org.apache.kyuubi.operation.{Operation, OperationHandle}
import org.apache.kyuubi.session.{AbstractSession, SessionHandle,
SessionManager}
@@ -56,7 +57,7 @@ class SparkSessionImpl(
case (key, value) => setModifiableConfig(key, value)
}
KDFRegistry.registerAll(spark)
- EventLoggingService.onEvent(sessionEvent)
+ EventLogging.onEvent(sessionEvent)
super.open()
}
@@ -67,7 +68,7 @@ class SparkSessionImpl(
override def close(): Unit = {
sessionEvent.endTime = System.currentTimeMillis()
- EventLoggingService.onEvent(sessionEvent)
+ EventLogging.onEvent(sessionEvent)
super.close()
spark.sessionState.catalog.getTempViewNames().foreach(spark.catalog.uncacheTable(_))
sessionManager.operationManager.asInstanceOf[SparkSQLOperationManager].closeILoop(handle)
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkContextHelper.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkContextHelper.scala
index ba67151..802c0f6 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkContextHelper.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkContextHelper.scala
@@ -20,7 +20,7 @@ package org.apache.spark.kyuubi
import org.apache.hadoop.security.Credentials
import org.apache.spark.SparkContext
import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.scheduler.SchedulerBackend
+import org.apache.spark.scheduler.{SchedulerBackend, SparkListenerEvent}
import
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.scheduler.local.LocalSchedulerBackend
@@ -29,8 +29,7 @@ import org.apache.spark.ui.SparkUI
import org.apache.kyuubi.Logging
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_STATEMENT_ID_KEY
-import org.apache.kyuubi.engine.spark.events.KyuubiSparkEvent
-import org.apache.kyuubi.events.EventLogger
+import org.apache.kyuubi.events.{EventLogger, KyuubiEvent}
/**
* A place to invoke non-public APIs of [[SparkContext]], anything to be added
here need to
@@ -38,7 +37,7 @@ import org.apache.kyuubi.events.EventLogger
*/
object SparkContextHelper extends Logging {
- def createSparkHistoryLogger(sc: SparkContext):
EventLogger[KyuubiSparkEvent] = {
+ def createSparkHistoryLogger(sc: SparkContext): EventLogger = {
new SparkHistoryEventLogger(sc)
}
@@ -87,8 +86,11 @@ object SparkContextHelper extends Logging {
* A [[EventLogger]] that logs everything to SparkHistory
* @param sc SparkContext
*/
-private class SparkHistoryEventLogger(sc: SparkContext) extends
EventLogger[KyuubiSparkEvent] {
- override def logEvent(kyuubiEvent: KyuubiSparkEvent): Unit = {
- sc.listenerBus.post(kyuubiEvent)
+private class SparkHistoryEventLogger(sc: SparkContext) extends EventLogger {
+ override def logEvent(kyuubiEvent: KyuubiEvent): Unit = {
+ kyuubiEvent match {
+ case s: SparkListenerEvent => sc.listenerBus.post(s)
+ case _ => // ignore
+ }
}
}
diff --git
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/events/EventLoggingServiceSuite.scala
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/events/EventLoggingServiceSuite.scala
index 495ae59..007ff67 100644
---
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/events/EventLoggingServiceSuite.scala
+++
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/events/EventLoggingServiceSuite.scala
@@ -62,8 +62,8 @@ class EventLoggingServiceSuite extends WithSparkSQLEngine
with HiveJDBCTestHelpe
val engineEventReader = new BufferedReader(new InputStreamReader(fs))
val readEvent =
- JsonProtocol.jsonToEvent(engineEventReader.readLine(),
classOf[KyuubiSparkEvent])
- assert(readEvent.isInstanceOf[KyuubiSparkEvent])
+ JsonProtocol.jsonToEvent(engineEventReader.readLine(),
classOf[EngineEvent])
+ assert(readEvent.isInstanceOf[EngineEvent])
withJdbcStatement() { statement =>
val table = engineEventPath.getParent
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/events/AbstractEventLoggingService.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/events/AbstractEventLoggingService.scala
index 08ad39f..248d3c7 100644
---
a/kyuubi-common/src/main/scala/org/apache/kyuubi/events/AbstractEventLoggingService.scala
+++
b/kyuubi-common/src/main/scala/org/apache/kyuubi/events/AbstractEventLoggingService.scala
@@ -21,16 +21,39 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.kyuubi.service.CompositeService
-abstract class AbstractEventLoggingService[T <: KyuubiEvent]
+abstract class AbstractEventLoggingService
extends CompositeService("EventLogging") {
- private val eventLoggers = new ArrayBuffer[EventLogger[T]]()
+ import EventLogging._
- def onEvent(event: T): Unit = {
+ private val eventLoggers = new ArrayBuffer[EventLogger]()
+
+ def onEvent(event: KyuubiEvent): Unit = {
eventLoggers.foreach(_.logEvent(event))
}
- def addEventLogger(logger: EventLogger[T]): Unit = {
+ def addEventLogger(logger: EventLogger): Unit = {
eventLoggers += logger
}
+
+ override def start(): Unit = synchronized {
+ super.start()
+ // expose the event logging service only when the loggers successfully
start
+ _service = Some(this)
+ }
+
+ override def stop(): Unit = synchronized {
+ _service = None
+ super.stop()
+ }
+
+}
+
+object EventLogging {
+
+ private[events] var _service: Option[AbstractEventLoggingService] = None
+
+ def onEvent(event: KyuubiEvent): Unit = {
+ _service.foreach(_.onEvent(event))
+ }
}
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/events/EventLogger.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/events/EventLogger.scala
index e56136e..875fc0b 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/events/EventLogger.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/events/EventLogger.scala
@@ -17,8 +17,8 @@
package org.apache.kyuubi.events
-trait EventLogger[T <: KyuubiEvent] {
+trait EventLogger {
- def logEvent(kyuubiEvent: T): Unit
+ def logEvent(kyuubiEvent: KyuubiEvent): Unit
}
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/events/JsonEventLogger.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/events/JsonEventLogger.scala
index f85f28c..9537a7e 100644
---
a/kyuubi-common/src/main/scala/org/apache/kyuubi/events/JsonEventLogger.scala
+++
b/kyuubi-common/src/main/scala/org/apache/kyuubi/events/JsonEventLogger.scala
@@ -41,11 +41,11 @@ import org.apache.kyuubi.service.AbstractService
* The ${date} is based on the time of events, e.g. engine.startTime,
statement.startTime
* @param logName the engine id formed of appId + attemptId(if any)
*/
-class JsonEventLogger[T <: KyuubiEvent](
+class JsonEventLogger(
logName: String,
logPath: ConfigEntry[String],
hadoopConf: Configuration)
- extends AbstractService("JsonEventLogger") with EventLogger[T] with Logging {
+ extends AbstractService("JsonEventLogger") with EventLogger with Logging {
type Logger = (PrintWriter, Option[FSDataOutputStream])
@@ -105,7 +105,7 @@ class JsonEventLogger[T <: KyuubiEvent](
super.stop()
}
- override def logEvent(kyuubiEvent: T): Unit = {
+ override def logEvent(kyuubiEvent: KyuubiEvent): Unit = {
val (writer, stream) = getOrUpdate(kyuubiEvent)
// scalastyle:off println
writer.println(kyuubiEvent.toJson)
@@ -119,7 +119,7 @@ class JsonEventLogger[T <: KyuubiEvent](
val logRoot: URI = URI.create(conf.get(ENGINE_EVENT_JSON_LOG_PATH))
val fs: FileSystem = FileSystem.get(logRoot, hadoopConf)
val success: Boolean = FileSystem.mkdirs(fs, new Path(logRoot),
JSON_LOG_DIR_PERM)
- if (success == false) {
+ if (!success) {
val fileStatus = fs.getFileStatus(new Path(logRoot))
if (!fileStatus.isDirectory) {
throw new IllegalArgumentException(s"Log directory $logRoot is not a
directory.")
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiOperationEvent.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiOperationEvent.scala
index 0ecb732..34c2360 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiOperationEvent.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiOperationEvent.scala
@@ -53,7 +53,7 @@ case class KyuubiOperationEvent private (
completeTime: Long,
exception: Option[Throwable],
sessionId: String,
- sessionUser: String) extends KyuubiServerEvent {
+ sessionUser: String) extends KyuubiEvent {
// operation events are partitioned by the date when the corresponding
operations are
// created.
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiServerEvent.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiServerEvent.scala
deleted file mode 100644
index aee2ed5..0000000
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiServerEvent.scala
+++ /dev/null
@@ -1,20 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kyuubi.events
-
-trait KyuubiServerEvent extends KyuubiEvent {}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiServerInfoEvent.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiServerInfoEvent.scala
index 18af357..95d826b 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiServerInfoEvent.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiServerInfoEvent.scala
@@ -39,7 +39,7 @@ case class KyuubiServerInfoEvent private (
state: String,
serverIP: String,
serverConf: Map[String, String],
- serverEnv: Map[String, String]) extends KyuubiServerEvent {
+ serverEnv: Map[String, String]) extends KyuubiEvent {
val BUILD_USER: String = P_BUILD_USER
val BUILD_DATE: String = P_BUILD_DATE
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiSessionEvent.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiSessionEvent.scala
index 9196c4d..a56b895 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiSessionEvent.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiSessionEvent.scala
@@ -51,7 +51,7 @@ case class KyuubiSessionEvent(
var engineId: String = "",
var openedTime: Long = -1L,
var endTime: Long = -1L,
- var totalOperations: Int = 0) extends KyuubiServerEvent {
+ var totalOperations: Int = 0) extends KyuubiEvent {
override def partitions: Seq[(String, String)] =
("day", Utils.getDateFromTimestamp(startTime)) :: Nil
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
index 3a9b265..74b8656 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
@@ -25,11 +25,10 @@ import org.apache.thrift.TException
import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.events.KyuubiOperationEvent
+import org.apache.kyuubi.events.{EventLogging, KyuubiOperationEvent}
import org.apache.kyuubi.operation.FetchOrientation.FETCH_NEXT
import org.apache.kyuubi.operation.OperationState.OperationState
import org.apache.kyuubi.operation.log.OperationLog
-import org.apache.kyuubi.server.EventLoggingService
import org.apache.kyuubi.session.{KyuubiSessionImpl, KyuubiSessionManager,
Session}
class ExecuteStatement(
@@ -39,7 +38,7 @@ class ExecuteStatement(
override val shouldRunAsync: Boolean,
queryTimeout: Long)
extends KyuubiOperation(OperationType.EXECUTE_STATEMENT, session) {
- EventLoggingService.onEvent(KyuubiOperationEvent(this))
+ EventLogging.onEvent(KyuubiOperationEvent(this))
final private val _operationLog: OperationLog =
if (shouldRunAsync) {
@@ -175,7 +174,7 @@ class ExecuteStatement(
override def setState(newState: OperationState): Unit = {
super.setState(newState)
- EventLoggingService.onEvent(KyuubiOperationEvent(this))
+ EventLogging.onEvent(KyuubiOperationEvent(this))
}
override def close(): Unit = {
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/EventLoggingService.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/EventLoggingService.scala
index ee7e65e..adb3e12 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/EventLoggingService.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/EventLoggingService.scala
@@ -20,15 +20,11 @@ package org.apache.kyuubi.server
import java.net.InetAddress
import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.config.KyuubiConf.SERVER_EVENT_JSON_LOG_PATH
-import org.apache.kyuubi.config.KyuubiConf.SERVER_EVENT_LOGGERS
-import org.apache.kyuubi.events.{AbstractEventLoggingService, EventLoggerType}
-import org.apache.kyuubi.events.JsonEventLogger
-import org.apache.kyuubi.events.KyuubiServerEvent
-import org.apache.kyuubi.server.EventLoggingService._service
+import org.apache.kyuubi.config.KyuubiConf.{SERVER_EVENT_JSON_LOG_PATH,
SERVER_EVENT_LOGGERS}
+import org.apache.kyuubi.events.{AbstractEventLoggingService, EventLoggerType,
JsonEventLogger}
import org.apache.kyuubi.util.KyuubiHadoopUtils
-class EventLoggingService extends
AbstractEventLoggingService[KyuubiServerEvent] {
+class EventLoggingService extends AbstractEventLoggingService {
override def initialize(conf: KyuubiConf): Unit = {
val hadoopConf = KyuubiHadoopUtils.newHadoopConf(conf)
@@ -37,7 +33,7 @@ class EventLoggingService extends
AbstractEventLoggingService[KyuubiServerEvent]
.foreach {
case EventLoggerType.JSON =>
val hostName = InetAddress.getLocalHost.getCanonicalHostName
- val jsonEventLogger = new JsonEventLogger[KyuubiServerEvent](
+ val jsonEventLogger = new JsonEventLogger(
s"server-$hostName",
SERVER_EVENT_JSON_LOG_PATH,
hadoopConf)
@@ -51,24 +47,4 @@ class EventLoggingService extends
AbstractEventLoggingService[KyuubiServerEvent]
}
super.initialize(conf)
}
-
- override def start(): Unit = {
- _service = Some(this)
- super.start()
- }
-
- override def stop(): Unit = {
- _service = None
- super.stop()
- }
-
-}
-
-object EventLoggingService {
-
- private var _service: Option[EventLoggingService] = None
-
- def onEvent(event: KyuubiServerEvent): Unit = {
- _service.foreach(_.onEvent(event))
- }
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala
index 003112b..3f6e34e 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala
@@ -31,7 +31,7 @@ import org.apache.kyuubi._
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.{FRONTEND_PROTOCOLS,
FrontendProtocols}
import org.apache.kyuubi.config.KyuubiConf.FrontendProtocols._
-import org.apache.kyuubi.events.KyuubiServerInfoEvent
+import org.apache.kyuubi.events.{EventLogging, KyuubiServerInfoEvent}
import org.apache.kyuubi.ha.HighAvailabilityConf._
import org.apache.kyuubi.ha.client.{ServiceDiscovery, ZooKeeperAuthTypes}
import org.apache.kyuubi.ha.client.ZooKeeperClientProvider._
@@ -154,7 +154,7 @@ class KyuubiServer(name: String) extends Serverable(name) {
throw new UnsupportedOperationException(s"Frontend protocol $other is
not supported yet.")
}
- private val eventLoggingService: EventLoggingService = new
EventLoggingService
+ private val eventLoggingService = new EventLoggingService()
override def initialize(conf: KyuubiConf): Unit = synchronized {
val kinit = new KinitAuxiliaryService()
@@ -170,11 +170,11 @@ class KyuubiServer(name: String) extends Serverable(name)
{
override def start(): Unit = {
super.start()
KyuubiServer.kyuubiServer = this
- KyuubiServerInfoEvent(this,
ServiceState.STARTED).foreach(EventLoggingService.onEvent)
+ KyuubiServerInfoEvent(this,
ServiceState.STARTED).foreach(EventLogging.onEvent)
}
override def stop(): Unit = {
- KyuubiServerInfoEvent(this,
ServiceState.STOPPED).foreach(EventLoggingService.onEvent)
+ KyuubiServerInfoEvent(this,
ServiceState.STOPPED).foreach(EventLogging.onEvent)
super.stop()
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
index 3ffe713..a460302 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
@@ -27,13 +27,12 @@ import org.apache.kyuubi.client.KyuubiSyncThriftClient
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.engine.EngineRef
-import org.apache.kyuubi.events.{KyuubiEvent, KyuubiSessionEvent}
+import org.apache.kyuubi.events.{EventLogging, KyuubiEvent, KyuubiSessionEvent}
import org.apache.kyuubi.ha.client.ZooKeeperClientProvider._
import org.apache.kyuubi.metrics.MetricsConstants._
import org.apache.kyuubi.metrics.MetricsSystem
import org.apache.kyuubi.operation.{Operation, OperationHandle}
import org.apache.kyuubi.operation.log.OperationLog
-import org.apache.kyuubi.server.EventLoggingService
import org.apache.kyuubi.service.authentication.EngineSecurityAccessor
class KyuubiSessionImpl(
@@ -71,7 +70,7 @@ class KyuubiSessionImpl(
.newLaunchEngineOperation(this,
sessionConf.get(SESSION_ENGINE_LAUNCH_ASYNC))
private val sessionEvent = KyuubiSessionEvent(this)
- EventLoggingService.onEvent(sessionEvent)
+ EventLogging.onEvent(sessionEvent)
override def getSessionEvent: Option[KyuubiEvent] = {
Option(sessionEvent)
@@ -109,7 +108,7 @@ class KyuubiSessionImpl(
sessionEvent.openedTime = System.currentTimeMillis()
sessionEvent.remoteSessionId = _engineSessionHandle.identifier.toString
_client.engineId.foreach(e => sessionEvent.engineId = e)
- EventLoggingService.onEvent(sessionEvent)
+ EventLogging.onEvent(sessionEvent)
}
}
@@ -155,7 +154,7 @@ class KyuubiSessionImpl(
if (_client != null) _client.closeSession()
} finally {
sessionEvent.endTime = System.currentTimeMillis()
- EventLoggingService.onEvent(sessionEvent)
+ EventLogging.onEvent(sessionEvent)
MetricsSystem.tracing(_.decCount(MetricRegistry.name(CONN_OPEN, user)))
}
}