This is an automated email from the ASF dual-hosted git repository.
ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 9db2e5c [KYUUBI #1356] Refine KyuubiStatementEvent
9db2e5c is described below
commit 9db2e5c054e3237411e36b1a410e06f98a633e66
Author: Kent Yao <[email protected]>
AuthorDate: Wed Nov 10 12:09:04 2021 +0800
[KYUUBI #1356] Refine KyuubiStatementEvent
<!--
Thanks for sending a pull request!
Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://kyuubi.readthedocs.io/en/latest/community/contributions.html
2. If the PR is related to an issue in
https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your
PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
3. If the PR is unfinished, add '[WIP]' in your PR title, e.g.,
'[WIP][KYUUBI #XXXX] Your PR title ...'.
-->
### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
1. If you add a feature, you can talk about the use case of it.
2. If you fix a bug, you can clarify why it is a bug.
-->
1. When a statement error happened, the event will be logged twice, in
`setState` and `setException`, we fix it
2. We break down KyuubiStatementEvent into three parts - statement basis,
status, and sessionId/user, see the improved comments
3. log the remote operation handle too
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run
test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #1356 from yaooqinn/se.
Closes #1356
4b5d0de2 [Kent Yao] Refine KyuubiStatementEvent
e7228ddd [Kent Yao] Refine KyuubiStatementEvent
Authored-by: Kent Yao <[email protected]>
Signed-off-by: ulysses-you <[email protected]>
---
.../kyuubi/operation/AbstractOperation.scala | 6 +-
.../apache/kyuubi/operation/OperationStatus.scala | 2 +
.../kyuubi/operation/OperationStatusSuite.scala | 2 +-
.../kyuubi/events/KyuubiStatementEvent.scala | 74 ++++++++++++++--------
.../apache/kyuubi/operation/ExecuteStatement.scala | 19 +-----
.../apache/kyuubi/operation/KyuubiOperation.scala | 2 +-
.../kyuubi/events/EventLoggingServiceSuite.scala | 11 ++--
7 files changed, 64 insertions(+), 52 deletions(-)
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala
index 02cc6d7..e017790 100644
---
a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala
+++
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala
@@ -32,6 +32,7 @@ import org.apache.kyuubi.session.Session
abstract class AbstractOperation(opType: OperationType, session: Session)
extends Operation with Logging {
+ private final val createTime = System.currentTimeMillis()
private final val handle = OperationHandle(opType, session.protocol)
private final val operationTimeout: Long = {
session.sessionManager.getConf.get(OPERATION_IDLE_TIMEOUT)
@@ -44,7 +45,7 @@ abstract class AbstractOperation(opType: OperationType,
session: Session)
@volatile protected var state: OperationState = INITIALIZED
@volatile protected var startTime: Long = _
@volatile protected var completedTime: Long = _
- @volatile protected var lastAccessTime: Long = System.currentTimeMillis()
+ @volatile protected var lastAccessTime: Long = createTime
@volatile protected var operationException: KyuubiSQLException = _
@volatile protected var hasResultSet: Boolean = false
@@ -147,7 +148,8 @@ abstract class AbstractOperation(opType: OperationType,
session: Session)
override def getHandle: OperationHandle = handle
override def getStatus: OperationStatus = {
- OperationStatus(state, startTime, completedTime, hasResultSet,
Option(operationException))
+ OperationStatus(state, createTime, startTime, lastAccessTime,
completedTime, hasResultSet,
+ Option(operationException))
}
override def shouldRunAsync: Boolean
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationStatus.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationStatus.scala
index de813df..686dc6e 100644
---
a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationStatus.scala
+++
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationStatus.scala
@@ -22,7 +22,9 @@ import
org.apache.kyuubi.operation.OperationState.OperationState
case class OperationStatus(
state: OperationState,
+ create: Long,
start: Long,
+ lastModified: Long,
completed: Long,
hasResultSet: Boolean,
exception: Option[KyuubiSQLException] = None)
diff --git
a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/OperationStatusSuite.scala
b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/OperationStatusSuite.scala
index a84c063..4ec43ba 100644
---
a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/OperationStatusSuite.scala
+++
b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/OperationStatusSuite.scala
@@ -22,7 +22,7 @@ import org.apache.kyuubi.{KyuubiFunSuite, KyuubiSQLException}
class OperationStatusSuite extends KyuubiFunSuite {
test("operation status") {
- val status = OperationStatus(OperationState.INITIALIZED, 0, 0,
hasResultSet = false)
+ val status = OperationStatus(OperationState.INITIALIZED, 0, 0, 0, 0,
hasResultSet = false)
assert(status.exception.isEmpty)
val status1 = status.copy(exception = Some(KyuubiSQLException("nothing")))
assert(status1.exception.get.getMessage === "nothing")
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiStatementEvent.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiStatementEvent.scala
index d7da77d..e405954 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiStatementEvent.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiStatementEvent.scala
@@ -18,49 +18,69 @@
package org.apache.kyuubi.events
import org.apache.kyuubi.Utils
-import org.apache.kyuubi.operation.ExecuteStatement
-import org.apache.kyuubi.operation.OperationState.OperationState
+import org.apache.kyuubi.operation.{ExecuteStatement, OperationHandle}
/**
+ * A [[KyuubiStatementEvent]] used to tracker the lifecycle of a statement at
server side.
+ * <ul>
+ * <li>Statement Basis</li>
+ * <li>Statement Live Status</li>
+ * <li>Parent Session Id</li>
+ * </ul>
*
- * @param user: who connect to kyuubi server
- * @param statementId: the identifier of operationHandler
- * @param statement: the sql that you execute
- * @param remoteIp: the ip of user
- * @param sessionId: the identifier of a session
- * @param createTime: the create time of this statement
- * @param state: store each state that the sql has
- * @param stateTime: the time that the sql's state change
+ * @param statementId the unique identifier of a single statement
+ * @param remoteId the unique identifier of a single statement at engine side
+ * @param statement the sql that you execute
+ * @param shouldRunAsync the flag indicating whether the query runs
synchronously or not
+ * @param state the current operation state
+ * @param eventTime the time when the event created & logged
+ * @param createTime the time for changing to the current operation state
+ * @param startTime the time the query start to time of this statement
+ * @param completeTime time time the query ends
* @param exception: caught exception if have
+ * @param sessionId the identifier of the parent session
+ * @param sessionUser the authenticated client user
*/
-case class KyuubiStatementEvent(
- user: String,
+case class KyuubiStatementEvent private (
statementId: String,
+ remoteId: String,
statement: String,
- remoteIp: String,
- sessionId: String,
+ shouldRunAsync: Boolean,
+ state: String,
+ eventTime: Long,
createTime: Long,
- var state: String,
- var stateTime: Long,
- var exception: String = "") extends KyuubiServerEvent {
+ startTime: Long,
+ completeTime: Long,
+ exception: Option[Throwable],
+ sessionId: String,
+ sessionUser: String) extends KyuubiServerEvent {
+
+ // statement events are partitioned by the date when the corresponding
operations are
+ // created.
override def partitions: Seq[(String, String)] =
("day", Utils.getDateFromTimestamp(createTime)) :: Nil
}
object KyuubiStatementEvent {
- def apply(statement: ExecuteStatement,
- statementId: String,
- state: OperationState,
- stateTime: Long): KyuubiStatementEvent = {
+
+ /**
+ * Shorthand for instantiating a statement event with a [[ExecuteStatement]]
instance
+ */
+ def apply(statement: ExecuteStatement): KyuubiStatementEvent = {
val session = statement.getSession
+ val status = statement.getStatus
new KyuubiStatementEvent(
- session.user,
- statementId,
+ statement.getHandle.identifier.toString,
+
Option(statement.remoteOpHandle()).map(OperationHandle(_).identifier.toString).orNull,
statement.statement,
- session.ipAddress,
+ statement.shouldRunAsync,
+ status.state.name(),
+ status.lastModified,
+ status.create,
+ status.start,
+ status.completed,
+ status.exception,
session.handle.identifier.toString,
- stateTime,
- state.toString,
- stateTime)
+ session.user)
}
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
index 6df933b..643080a 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
@@ -41,11 +41,8 @@ class ExecuteStatement(
override val statement: String,
override val shouldRunAsync: Boolean,
queryTimeout: Long)
- extends KyuubiOperation(
- OperationType.EXECUTE_STATEMENT, session, client) {
-
- val statementEvent: KyuubiStatementEvent =
- KyuubiStatementEvent(this, statementId, state, lastAccessTime)
+ extends KyuubiOperation(OperationType.EXECUTE_STATEMENT, session, client) {
+ EventLoggingService.onEvent(KyuubiStatementEvent(this))
private final val _operationLog: OperationLog = if (shouldRunAsync) {
OperationLog.createOperationLog(session, getHandle)
@@ -59,8 +56,6 @@ class ExecuteStatement(
override def getOperationLog: Option[OperationLog] = Option(_operationLog)
- EventLoggingService.onEvent(statementEvent)
-
override def beforeRun(): Unit = {
OperationLog.setCurrentOperationLog(_operationLog)
setHasResultSet(true)
@@ -180,15 +175,7 @@ class ExecuteStatement(
override def setState(newState: OperationState): Unit = {
super.setState(newState)
- statementEvent.state = newState.toString
- statementEvent.stateTime = lastAccessTime
- EventLoggingService.onEvent(statementEvent)
- }
-
- override def setOperationException(opEx: KyuubiSQLException): Unit = {
- super.setOperationException(opEx)
- statementEvent.exception = opEx.toString
- EventLoggingService.onEvent(statementEvent)
+ EventLoggingService.onEvent(KyuubiStatementEvent(this))
}
override def close(): Unit = {
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
index dc6ae77..a69814d 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
@@ -57,7 +57,6 @@ abstract class KyuubiOperation(
MetricsSystem.tracing {
_.incCount(MetricRegistry.name(STATEMENT_FAIL, errorType))
}
- setState(OperationState.ERROR)
val ke = e match {
case kse: KyuubiSQLException => kse
case te: TTransportException if te.getType ==
TTransportException.END_OF_FILE &&
@@ -69,6 +68,7 @@ abstract class KyuubiOperation(
KyuubiSQLException(s"Error $action $opType:
${Utils.stringifyException(e)}", e)
}
setOperationException(ke)
+ setState(OperationState.ERROR)
throw ke
}
}
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/events/EventLoggingServiceSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/events/EventLoggingServiceSuite.scala
index 59fcf37..7739861 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/events/EventLoggingServiceSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/events/EventLoggingServiceSuite.scala
@@ -48,7 +48,7 @@ class EventLoggingServiceSuite extends WithKyuubiServer with
HiveJDBCTestHelper
override protected def jdbcUrl: String = getJdbcUrl
- test("statementEvent: generate, dump and query") {
+ test("round-trip for logging and querying statement events for both kyuubi
server and engine") {
val hostName = InetAddress.getLocalHost.getCanonicalHostName
val serverStatementEventPath =
Paths.get(serverLogRoot, "kyuubi_statement", s"day=$currentDate",
s"server-$hostName.json")
@@ -58,7 +58,6 @@ class EventLoggingServiceSuite extends WithKyuubiServer with
HiveJDBCTestHelper
withJdbcStatement() { statement =>
statement.execute(sql)
-
// check server statement events
val serverTable = serverStatementEventPath.getParent
val resultSet = statement.executeQuery(s"SELECT * FROM
`json`.`${serverTable}`" +
@@ -66,9 +65,11 @@ class EventLoggingServiceSuite extends WithKyuubiServer with
HiveJDBCTestHelper
val states = Array(INITIALIZED, PENDING, RUNNING, FINISHED, CLOSED)
var stateIndex = 0
while (resultSet.next()) {
- assert(resultSet.getString("user") == Utils.currentUser)
- assert(resultSet.getString("statement") == sql)
- assert(resultSet.getString("state") == states(stateIndex).toString)
+ assert(resultSet.getString("statement") === sql)
+ assert(resultSet.getString("shouldRunAsync") === "true")
+ assert(resultSet.getString("state") === states(stateIndex).name())
+ assert(resultSet.getString("exception") === null)
+ assert(resultSet.getString("sessionUser") === Utils.currentUser)
stateIndex += 1
}