This is an automated email from the ASF dual-hosted git repository.
ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new d49f8bf [KYUUBI #988] Add Kyuubi session event
d49f8bf is described below
commit d49f8bf06d061b4112ee323b88771658de071b0b
Author: ulysses-you <[email protected]>
AuthorDate: Mon Aug 30 16:54:01 2021 +0800
[KYUUBI #988] Add Kyuubi session event
<!--
Thanks for sending a pull request!
Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://kyuubi.readthedocs.io/en/latest/community/contributions.html
2. If the PR is related to an issue in
https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your
PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
3. If the PR is unfinished, add '[WIP]' in your PR title, e.g.,
'[WIP][KYUUBI #XXXX] Your PR title ...'.
-->
### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
1. If you add a feature, you can talk about the use case of it.
2. If you fix a bug, you can clarify why it is a bug.
-->
closes https://github.com/apache/incubator-kyuubi/issues/988
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run
test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #989 from ulysses-you/kyuubi-session-event.
Closes #988
baebf2eb [ulysses-you] address comment
283d574c [ulysses-you] fix
c5d1d5f7 [ulysses-you] adddress comment
44cfd06f [ulysses-you] docs
43639993 [ulysses-you] simplify session
2618b764 [ulysses-you] config name
a2d4608b [ulysses-you] docs
5adbe5c8 [ulysses-you] config
ea5c70e1 [ulysses-you] nit
eea73ed5 [ulysses-you] config
cda6d38b [ulysses-you] test
a754c76f [ulysses-you] init
Authored-by: ulysses-you <[email protected]>
Signed-off-by: ulysses-you <[email protected]>
---
docs/deployment/settings.md | 1 +
.../org/apache/kyuubi/config/KyuubiConf.scala | 9 +++
.../apache/kyuubi/events/KyuubiSessionEvent.scala | 68 ++++++++++++++++++++++
.../org/apache/kyuubi/server/KyuubiServer.scala | 6 ++
.../apache/kyuubi/session/KyuubiSessionImpl.scala | 12 ++++
.../kyuubi/events/EventLoggingServiceSuite.scala | 26 +++++++++
6 files changed, 122 insertions(+)
diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md
index 934a151..6237227 100644
--- a/docs/deployment/settings.md
+++ b/docs/deployment/settings.md
@@ -259,6 +259,7 @@ kyuubi\.session\.engine<br>\.share\.level|<div
style='width: 65pt;word-wrap: bre
kyuubi\.session\.engine<br>\.spark\.main\.resource|<div style='width:
65pt;word-wrap: break-word;white-space: normal'><undefined></div>|<div
style='width: 170pt;word-wrap: break-word;white-space: normal'>The package used
to create Spark SQL engine remote application. If it is undefined, Kyuubi will
use the default</div>|<div style='width: 30pt'>string</div>|<div style='width:
20pt'>1.0.0</div>
kyuubi\.session\.engine<br>\.startup\.error\.max<br>\.size|<div style='width:
65pt;word-wrap: break-word;white-space: normal'>8192</div>|<div style='width:
170pt;word-wrap: break-word;white-space: normal'>During engine bootstrapping,
if error occurs, using this config to limit the length error
message(characters).</div>|<div style='width: 30pt'>int</div>|<div
style='width: 20pt'>1.1.0</div>
kyuubi\.session\.idle<br>\.timeout|<div style='width: 65pt;word-wrap:
break-word;white-space: normal'>PT6H</div>|<div style='width: 170pt;word-wrap:
break-word;white-space: normal'>session idle timeout, it will be closed when
it's not accessed for this duration</div>|<div style='width:
30pt'>duration</div>|<div style='width: 20pt'>1.2.0</div>
+kyuubi\.session\.name|<div style='width: 65pt;word-wrap:
break-word;white-space: normal'><undefined></div>|<div style='width:
170pt;word-wrap: break-word;white-space: normal'>A human readable name of
session and we use empty string by default. This name will be recorded in
event. Note that, we only apply this value from session conf.</div>|<div
style='width: 30pt'>string</div>|<div style='width: 20pt'>1.4.0</div>
kyuubi\.session<br>\.timeout|<div style='width: 65pt;word-wrap:
break-word;white-space: normal'>PT6H</div>|<div style='width: 170pt;word-wrap:
break-word;white-space: normal'>(deprecated)session timeout, it will be closed
when it's not accessed for this duration</div>|<div style='width:
30pt'>duration</div>|<div style='width: 20pt'>1.0.0</div>
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index 7627ef2..ae5cd22 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -752,4 +752,13 @@ object KyuubiConf {
.version("1.3.0")
.booleanConf
.createWithDefault(true)
+
+ val SESSION_NAME: OptionalConfigEntry[String] =
+ buildConf("session.name")
+ .doc("A human readable name of session and we use empty string by
default. " +
+ "This name will be recorded in event. Note that, we only apply this
value from " +
+ "session conf.")
+ .version("1.4.0")
+ .stringConf
+ .createOptional
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiSessionEvent.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiSessionEvent.scala
new file mode 100644
index 0000000..668c80a
--- /dev/null
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiSessionEvent.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.events
+
+import org.apache.kyuubi.Utils
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.server.KyuubiServer
+import org.apache.kyuubi.session.KyuubiSessionImpl
+
+/**
+ * @param sessionId server session id
+ * @param sessionName if user not specify it, we use empty string instead
+ * @param user session user
+ * @param clientIP client ip address
+ * @param serverIP A unique Kyuubi server id, e.g. kyuubi server ip address
and port,
+ * it is useful if has multi-instance Kyuubi Server
+ * @param clientVersion client version
+ * @param conf session config
+ * @param startTime session create time
+ * @param endTime session end time
+ * @param totalOperations how many queries and meta calls
+ */
+case class KyuubiSessionEvent(
+ sessionId: String,
+ sessionName: String,
+ user: String,
+ clientIP: String,
+ serverIP: String,
+ clientVersion: Int,
+ conf: Map[String, String],
+ startTime: Long,
+ var endTime: Long = -1L,
+ var totalOperations: Int = 0) extends KyuubiServerEvent {
+ override def partitions: Seq[(String, String)] =
+ ("day", Utils.getDateFromTimestamp(startTime)) :: Nil
+}
+
+object KyuubiSessionEvent {
+ def apply(session: KyuubiSessionImpl): KyuubiSessionEvent = {
+ assert(KyuubiServer.kyuubiServer != null)
+ val serverIP = KyuubiServer.kyuubiServer.connectionUrl
+ val sessionName: String =
session.normalizedConf.getOrElse(KyuubiConf.SESSION_NAME.key, "")
+ KyuubiSessionEvent(
+ session.handle.identifier.toString,
+ sessionName,
+ session.user,
+ session.ipAddress,
+ serverIP,
+ session.handle.protocol.getValue,
+ session.conf,
+ session.createTime)
+ }
+}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala
index d97f827..07c5bfe 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala
@@ -32,6 +32,7 @@ import org.apache.kyuubi.zookeeper.EmbeddedZookeeper
object KyuubiServer extends Logging {
private val zkServer = new EmbeddedZookeeper()
+ private[kyuubi] var kyuubiServer: KyuubiServer = _
def startServer(conf: KyuubiConf): KyuubiServer = {
if (!ServiceDiscovery.supportServiceDiscovery(conf)) {
@@ -103,6 +104,11 @@ class KyuubiServer(name: String) extends Serverable(name) {
super.initialize(conf)
}
+ override def start(): Unit = {
+ super.start()
+ KyuubiServer.kyuubiServer = this
+ }
+
override protected def stopServer(): Unit = {}
override def connectionUrl: String = {
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
index a3a0c59..8d416e6 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
@@ -28,9 +28,12 @@ import org.apache.kyuubi.client.KyuubiSyncThriftClient
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.engine.EngineRef
+import org.apache.kyuubi.events.KyuubiSessionEvent
import org.apache.kyuubi.ha.client.ZooKeeperClientProvider._
import org.apache.kyuubi.metrics.MetricsConstants._
import org.apache.kyuubi.metrics.MetricsSystem
+import org.apache.kyuubi.operation.{Operation, OperationHandle}
+import org.apache.kyuubi.server.EventLoggingService
import org.apache.kyuubi.service.authentication.PlainSASLHelper
class KyuubiSessionImpl(
@@ -51,6 +54,8 @@ class KyuubiSessionImpl(
}
private val engine: EngineRef = EngineRef(sessionConf, user, handle)
+ private val sessionEvent = KyuubiSessionEvent(this)
+ EventLoggingService.onEvent(sessionEvent)
private var transport: TTransport = _
private var client: KyuubiSyncThriftClient = _
@@ -81,6 +86,11 @@ class KyuubiSessionImpl(
sessionManager.operationManager.setConnection(handle, client)
}
+ override protected def runOperation(operation: Operation): OperationHandle =
{
+ sessionEvent.totalOperations += 1
+ super.runOperation(operation)
+ }
+
override def close(): Unit = {
super.close()
sessionManager.operationManager.removeConnection(handle)
@@ -90,6 +100,8 @@ class KyuubiSessionImpl(
case e: TException =>
throw KyuubiSQLException("Error while cleaning up the engine
resources", e)
} finally {
+ sessionEvent.endTime = System.currentTimeMillis()
+ EventLoggingService.onEvent(sessionEvent)
MetricsSystem.tracing(_.decCount(MetricRegistry.name(CONN_OPEN, user)))
if (transport != null && transport.isOpen) {
transport.close()
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/events/EventLoggingServiceSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/events/EventLoggingServiceSuite.scala
index 988fe4d..b8ea732 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/events/EventLoggingServiceSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/events/EventLoggingServiceSuite.scala
@@ -79,4 +79,30 @@ class EventLoggingServiceSuite extends WithKyuubiServer with
JDBCTestUtils {
}
}
}
+
+ test("test Kyuubi session event") {
+ withSessionConf()(Map.empty)(Map(KyuubiConf.SESSION_NAME.key -> "test1")) {
+ withJdbcStatement() { statement =>
+ statement.execute("SELECT 1")
+ }
+ }
+
+ val eventPath =
+ Paths.get(logRoot.toString, "kyuubi-session", s"day=$currentDate")
+ withSessionConf()(Map.empty)(Map("spark.sql.shuffle.partitions" -> "2")) {
+ withJdbcStatement() { statement =>
+ val res = statement.executeQuery(
+ s"SELECT * FROM `json`.`$eventPath` where sessionName = 'test1'
order by totalOperations")
+ assert(res.next())
+ assert(res.getString("user") == Utils.currentUser)
+ assert(res.getString("sessionName") == "test1")
+ assert(res.getLong("startTime") > 0)
+ assert(res.getInt("totalOperations") == 0)
+ assert(res.next())
+ assert(res.getInt("totalOperations") == 1)
+ assert(res.getLong("endTime") > 0)
+ assert(!res.next())
+ }
+ }
+ }
}