This is an automated email from the ASF dual-hosted git repository.

ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new d49f8bf  [KYUUBI #988] Add Kyuubi session event
d49f8bf is described below

commit d49f8bf06d061b4112ee323b88771658de071b0b
Author: ulysses-you <[email protected]>
AuthorDate: Mon Aug 30 16:54:01 2021 +0800

    [KYUUBI #988] Add Kyuubi session event
    
    <!--
    Thanks for sending a pull request!
    
    Here are some tips for you:
      1. If this is your first time, please read our contributor guidelines: 
https://kyuubi.readthedocs.io/en/latest/community/contributions.html
      2. If the PR is related to an issue in 
https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your 
PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
      3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., 
'[WIP][KYUUBI #XXXX] Your PR title ...'.
    -->
    
    ### _Why are the changes needed?_
    <!--
    Please clarify why the changes are needed. For instance,
      1. If you add a feature, you can talk about the use case of it.
      2. If you fix a bug, you can clarify why it is a bug.
    -->
    closes https://github.com/apache/incubator-kyuubi/issues/988
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run 
test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests)
 locally before make a pull request
    
    Closes #989 from ulysses-you/kyuubi-session-event.
    
    Closes #988
    
    baebf2eb [ulysses-you] address comment
    283d574c [ulysses-you] fix
    c5d1d5f7 [ulysses-you] adddress comment
    44cfd06f [ulysses-you] docs
    43639993 [ulysses-you] simplify session
    2618b764 [ulysses-you] config name
    a2d4608b [ulysses-you] docs
    5adbe5c8 [ulysses-you] config
    ea5c70e1 [ulysses-you] nit
    eea73ed5 [ulysses-you] config
    cda6d38b [ulysses-you] test
    a754c76f [ulysses-you] init
    
    Authored-by: ulysses-you <[email protected]>
    Signed-off-by: ulysses-you <[email protected]>
---
 docs/deployment/settings.md                        |  1 +
 .../org/apache/kyuubi/config/KyuubiConf.scala      |  9 +++
 .../apache/kyuubi/events/KyuubiSessionEvent.scala  | 68 ++++++++++++++++++++++
 .../org/apache/kyuubi/server/KyuubiServer.scala    |  6 ++
 .../apache/kyuubi/session/KyuubiSessionImpl.scala  | 12 ++++
 .../kyuubi/events/EventLoggingServiceSuite.scala   | 26 +++++++++
 6 files changed, 122 insertions(+)

diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md
index 934a151..6237227 100644
--- a/docs/deployment/settings.md
+++ b/docs/deployment/settings.md
@@ -259,6 +259,7 @@ kyuubi\.session\.engine<br>\.share\.level|<div 
style='width: 65pt;word-wrap: bre
 kyuubi\.session\.engine<br>\.spark\.main\.resource|<div style='width: 
65pt;word-wrap: break-word;white-space: normal'>&lt;undefined&gt;</div>|<div 
style='width: 170pt;word-wrap: break-word;white-space: normal'>The package used 
to create Spark SQL engine remote application. If it is undefined, Kyuubi will 
use the default</div>|<div style='width: 30pt'>string</div>|<div style='width: 
20pt'>1.0.0</div>
 kyuubi\.session\.engine<br>\.startup\.error\.max<br>\.size|<div style='width: 
65pt;word-wrap: break-word;white-space: normal'>8192</div>|<div style='width: 
170pt;word-wrap: break-word;white-space: normal'>During engine bootstrapping, 
if error occurs, using this config to limit the length error 
message(characters).</div>|<div style='width: 30pt'>int</div>|<div 
style='width: 20pt'>1.1.0</div>
 kyuubi\.session\.idle<br>\.timeout|<div style='width: 65pt;word-wrap: 
break-word;white-space: normal'>PT6H</div>|<div style='width: 170pt;word-wrap: 
break-word;white-space: normal'>session idle timeout, it will be closed when 
it's not accessed for this duration</div>|<div style='width: 
30pt'>duration</div>|<div style='width: 20pt'>1.2.0</div>
+kyuubi\.session\.name|<div style='width: 65pt;word-wrap: 
break-word;white-space: normal'>&lt;undefined&gt;</div>|<div style='width: 
170pt;word-wrap: break-word;white-space: normal'>A human readable name of 
session and we use empty string by default. This name will be recorded in 
event. Note that, we only apply this value from session conf.</div>|<div 
style='width: 30pt'>string</div>|<div style='width: 20pt'>1.4.0</div>
 kyuubi\.session<br>\.timeout|<div style='width: 65pt;word-wrap: 
break-word;white-space: normal'>PT6H</div>|<div style='width: 170pt;word-wrap: 
break-word;white-space: normal'>(deprecated)session timeout, it will be closed 
when it's not accessed for this duration</div>|<div style='width: 
30pt'>duration</div>|<div style='width: 20pt'>1.0.0</div>
 
 
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index 7627ef2..ae5cd22 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -752,4 +752,13 @@ object KyuubiConf {
       .version("1.3.0")
       .booleanConf
       .createWithDefault(true)
+
+  val SESSION_NAME: OptionalConfigEntry[String] =
+    buildConf("session.name")
+      .doc("A human readable name of session and we use empty string by 
default. " +
+        "This name will be recorded in event. Note that, we only apply this 
value from " +
+        "session conf.")
+      .version("1.4.0")
+      .stringConf
+      .createOptional
 }
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiSessionEvent.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiSessionEvent.scala
new file mode 100644
index 0000000..668c80a
--- /dev/null
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiSessionEvent.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.events
+
+import org.apache.kyuubi.Utils
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.server.KyuubiServer
+import org.apache.kyuubi.session.KyuubiSessionImpl
+
+/**
+ * @param sessionId server session id
+ * @param sessionName if user not specify it, we use empty string instead
+ * @param user session user
+ * @param clientIP client ip address
+ * @param serverIP A unique Kyuubi server id, e.g. kyuubi server ip address 
and port,
+ *                 it is useful if has multi-instance Kyuubi Server
+ * @param clientVersion client version
+ * @param conf session config
+ * @param startTime session create time
+ * @param endTime session end time
+ * @param totalOperations how many queries and meta calls
+ */
+case class KyuubiSessionEvent(
+    sessionId: String,
+    sessionName: String,
+    user: String,
+    clientIP: String,
+    serverIP: String,
+    clientVersion: Int,
+    conf: Map[String, String],
+    startTime: Long,
+    var endTime: Long = -1L,
+    var totalOperations: Int = 0) extends KyuubiServerEvent {
+  override def partitions: Seq[(String, String)] =
+    ("day", Utils.getDateFromTimestamp(startTime)) :: Nil
+}
+
+object KyuubiSessionEvent {
+  def apply(session: KyuubiSessionImpl): KyuubiSessionEvent = {
+    assert(KyuubiServer.kyuubiServer != null)
+    val serverIP = KyuubiServer.kyuubiServer.connectionUrl
+    val sessionName: String = 
session.normalizedConf.getOrElse(KyuubiConf.SESSION_NAME.key, "")
+    KyuubiSessionEvent(
+      session.handle.identifier.toString,
+      sessionName,
+      session.user,
+      session.ipAddress,
+      serverIP,
+      session.handle.protocol.getValue,
+      session.conf,
+      session.createTime)
+  }
+}
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala
index d97f827..07c5bfe 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala
@@ -32,6 +32,7 @@ import org.apache.kyuubi.zookeeper.EmbeddedZookeeper
 
 object KyuubiServer extends Logging {
   private val zkServer = new EmbeddedZookeeper()
+  private[kyuubi] var kyuubiServer: KyuubiServer = _
 
   def startServer(conf: KyuubiConf): KyuubiServer = {
     if (!ServiceDiscovery.supportServiceDiscovery(conf)) {
@@ -103,6 +104,11 @@ class KyuubiServer(name: String) extends Serverable(name) {
     super.initialize(conf)
   }
 
+  override def start(): Unit = {
+    super.start()
+    KyuubiServer.kyuubiServer = this
+  }
+
   override protected def stopServer(): Unit = {}
 
   override def connectionUrl: String = {
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
index a3a0c59..8d416e6 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
@@ -28,9 +28,12 @@ import org.apache.kyuubi.client.KyuubiSyncThriftClient
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.config.KyuubiConf._
 import org.apache.kyuubi.engine.EngineRef
+import org.apache.kyuubi.events.KyuubiSessionEvent
 import org.apache.kyuubi.ha.client.ZooKeeperClientProvider._
 import org.apache.kyuubi.metrics.MetricsConstants._
 import org.apache.kyuubi.metrics.MetricsSystem
+import org.apache.kyuubi.operation.{Operation, OperationHandle}
+import org.apache.kyuubi.server.EventLoggingService
 import org.apache.kyuubi.service.authentication.PlainSASLHelper
 
 class KyuubiSessionImpl(
@@ -51,6 +54,8 @@ class KyuubiSessionImpl(
   }
 
   private val engine: EngineRef = EngineRef(sessionConf, user, handle)
+  private val sessionEvent = KyuubiSessionEvent(this)
+  EventLoggingService.onEvent(sessionEvent)
 
   private var transport: TTransport = _
   private var client: KyuubiSyncThriftClient = _
@@ -81,6 +86,11 @@ class KyuubiSessionImpl(
     sessionManager.operationManager.setConnection(handle, client)
   }
 
+  override protected def runOperation(operation: Operation): OperationHandle = 
{
+    sessionEvent.totalOperations += 1
+    super.runOperation(operation)
+  }
+
   override def close(): Unit = {
     super.close()
     sessionManager.operationManager.removeConnection(handle)
@@ -90,6 +100,8 @@ class KyuubiSessionImpl(
       case e: TException =>
         throw KyuubiSQLException("Error while cleaning up the engine 
resources", e)
     } finally {
+      sessionEvent.endTime = System.currentTimeMillis()
+      EventLoggingService.onEvent(sessionEvent)
       MetricsSystem.tracing(_.decCount(MetricRegistry.name(CONN_OPEN, user)))
       if (transport != null && transport.isOpen) {
         transport.close()
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/events/EventLoggingServiceSuite.scala
 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/events/EventLoggingServiceSuite.scala
index 988fe4d..b8ea732 100644
--- 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/events/EventLoggingServiceSuite.scala
+++ 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/events/EventLoggingServiceSuite.scala
@@ -79,4 +79,30 @@ class EventLoggingServiceSuite extends WithKyuubiServer with 
JDBCTestUtils {
       }
     }
   }
+
+  test("test Kyuubi session event") {
+    withSessionConf()(Map.empty)(Map(KyuubiConf.SESSION_NAME.key -> "test1")) {
+      withJdbcStatement() { statement =>
+        statement.execute("SELECT 1")
+      }
+    }
+
+    val eventPath =
+      Paths.get(logRoot.toString, "kyuubi-session", s"day=$currentDate")
+    withSessionConf()(Map.empty)(Map("spark.sql.shuffle.partitions" -> "2")) {
+      withJdbcStatement() { statement =>
+        val res = statement.executeQuery(
+          s"SELECT * FROM `json`.`$eventPath` where sessionName = 'test1' 
order by totalOperations")
+        assert(res.next())
+        assert(res.getString("user") == Utils.currentUser)
+        assert(res.getString("sessionName") == "test1")
+        assert(res.getLong("startTime") > 0)
+        assert(res.getInt("totalOperations") == 0)
+        assert(res.next())
+        assert(res.getInt("totalOperations") == 1)
+        assert(res.getLong("endTime") > 0)
+        assert(!res.next())
+      }
+    }
+  }
 }

Reply via email to