This is an automated email from the ASF dual-hosted git repository.

ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 9db2e5c  [KYUUBI #1356] Refine KyuubiStatementEvent
9db2e5c is described below

commit 9db2e5c054e3237411e36b1a410e06f98a633e66
Author: Kent Yao <[email protected]>
AuthorDate: Wed Nov 10 12:09:04 2021 +0800

    [KYUUBI #1356] Refine KyuubiStatementEvent
    
    <!--
    Thanks for sending a pull request!
    
    Here are some tips for you:
      1. If this is your first time, please read our contributor guidelines: 
https://kyuubi.readthedocs.io/en/latest/community/contributions.html
      2. If the PR is related to an issue in 
https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your 
PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
      3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., 
'[WIP][KYUUBI #XXXX] Your PR title ...'.
    -->
    
    ### _Why are the changes needed?_
    <!--
    Please clarify why the changes are needed. For instance,
      1. If you add a feature, you can talk about the use case of it.
      2. If you fix a bug, you can clarify why it is a bug.
    -->
    1. When a statement error happened, the event will be logged twice, in 
`setState` and `setException`, we fix it
    2. We break down KyuubiStatementEvent into three parts - statement basis, 
status, and sessionId/user, see the improved comments
    3. log the remote operation handle too
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run 
test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests)
 locally before make a pull request
    
    Closes #1356 from yaooqinn/se.
    
    Closes #1356
    
    4b5d0de2 [Kent Yao] Refine KyuubiStatementEvent
    e7228ddd [Kent Yao] Refine KyuubiStatementEvent
    
    Authored-by: Kent Yao <[email protected]>
    Signed-off-by: ulysses-you <[email protected]>
---
 .../kyuubi/operation/AbstractOperation.scala       |  6 +-
 .../apache/kyuubi/operation/OperationStatus.scala  |  2 +
 .../kyuubi/operation/OperationStatusSuite.scala    |  2 +-
 .../kyuubi/events/KyuubiStatementEvent.scala       | 74 ++++++++++++++--------
 .../apache/kyuubi/operation/ExecuteStatement.scala | 19 +-----
 .../apache/kyuubi/operation/KyuubiOperation.scala  |  2 +-
 .../kyuubi/events/EventLoggingServiceSuite.scala   | 11 ++--
 7 files changed, 64 insertions(+), 52 deletions(-)

diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala
 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala
index 02cc6d7..e017790 100644
--- 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala
+++ 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala
@@ -32,6 +32,7 @@ import org.apache.kyuubi.session.Session
 abstract class AbstractOperation(opType: OperationType, session: Session)
   extends Operation with Logging {
 
+  private final val createTime = System.currentTimeMillis()
   private final val handle = OperationHandle(opType, session.protocol)
   private final val operationTimeout: Long = {
     session.sessionManager.getConf.get(OPERATION_IDLE_TIMEOUT)
@@ -44,7 +45,7 @@ abstract class AbstractOperation(opType: OperationType, 
session: Session)
   @volatile protected var state: OperationState = INITIALIZED
   @volatile protected var startTime: Long = _
   @volatile protected var completedTime: Long = _
-  @volatile protected var lastAccessTime: Long = System.currentTimeMillis()
+  @volatile protected var lastAccessTime: Long = createTime
 
   @volatile protected var operationException: KyuubiSQLException = _
   @volatile protected var hasResultSet: Boolean = false
@@ -147,7 +148,8 @@ abstract class AbstractOperation(opType: OperationType, 
session: Session)
   override def getHandle: OperationHandle = handle
 
   override def getStatus: OperationStatus = {
-    OperationStatus(state, startTime, completedTime, hasResultSet, 
Option(operationException))
+    OperationStatus(state, createTime, startTime, lastAccessTime, 
completedTime, hasResultSet,
+      Option(operationException))
   }
 
   override def shouldRunAsync: Boolean
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationStatus.scala
 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationStatus.scala
index de813df..686dc6e 100644
--- 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationStatus.scala
+++ 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationStatus.scala
@@ -22,7 +22,9 @@ import 
org.apache.kyuubi.operation.OperationState.OperationState
 
 case class OperationStatus(
     state: OperationState,
+    create: Long,
     start: Long,
+    lastModified: Long,
     completed: Long,
     hasResultSet: Boolean,
     exception: Option[KyuubiSQLException] = None)
diff --git 
a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/OperationStatusSuite.scala
 
b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/OperationStatusSuite.scala
index a84c063..4ec43ba 100644
--- 
a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/OperationStatusSuite.scala
+++ 
b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/OperationStatusSuite.scala
@@ -22,7 +22,7 @@ import org.apache.kyuubi.{KyuubiFunSuite, KyuubiSQLException}
 class OperationStatusSuite extends KyuubiFunSuite {
 
   test("operation status") {
-    val status = OperationStatus(OperationState.INITIALIZED, 0, 0, 
hasResultSet = false)
+    val status = OperationStatus(OperationState.INITIALIZED, 0, 0, 0, 0, 
hasResultSet = false)
     assert(status.exception.isEmpty)
     val status1 = status.copy(exception = Some(KyuubiSQLException("nothing")))
     assert(status1.exception.get.getMessage === "nothing")
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiStatementEvent.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiStatementEvent.scala
index d7da77d..e405954 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiStatementEvent.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiStatementEvent.scala
@@ -18,49 +18,69 @@
 package org.apache.kyuubi.events
 
 import org.apache.kyuubi.Utils
-import org.apache.kyuubi.operation.ExecuteStatement
-import org.apache.kyuubi.operation.OperationState.OperationState
+import org.apache.kyuubi.operation.{ExecuteStatement, OperationHandle}
 
 /**
+ * A [[KyuubiStatementEvent]] used to tracker the lifecycle of a statement at 
server side.
+ * <ul>
+ *   <li>Statement Basis</li>
+ *   <li>Statement Live Status</li>
+ *   <li>Parent Session Id</li>
+ * </ul>
  *
- * @param user: who connect to kyuubi server
- * @param statementId: the identifier of operationHandler
- * @param statement: the sql that you execute
- * @param remoteIp: the ip of user
- * @param sessionId: the identifier of a session
- * @param createTime: the create time of this statement
- * @param state: store each state that the sql has
- * @param stateTime: the time that the sql's state change
+ * @param statementId the unique identifier of a single statement
+ * @param remoteId the unique identifier of a single statement at engine side
+ * @param statement the sql that you execute
+ * @param shouldRunAsync the flag indicating whether the query runs 
synchronously or not
+ * @param state the current operation state
+ * @param eventTime the time when the event created & logged
+ * @param createTime the time for changing to the current operation state
+ * @param startTime the time the query start to time of this statement
+ * @param completeTime time time the query ends
  * @param exception: caught exception if have
+ * @param sessionId the identifier of the parent session
+ * @param sessionUser the authenticated client user
  */
-case class KyuubiStatementEvent(
-    user: String,
+case class KyuubiStatementEvent private (
     statementId: String,
+    remoteId: String,
     statement: String,
-    remoteIp: String,
-    sessionId: String,
+    shouldRunAsync: Boolean,
+    state: String,
+    eventTime: Long,
     createTime: Long,
-    var state: String,
-    var stateTime: Long,
-    var exception: String = "") extends KyuubiServerEvent {
+    startTime: Long,
+    completeTime: Long,
+    exception: Option[Throwable],
+    sessionId: String,
+    sessionUser: String) extends KyuubiServerEvent {
+
+  // statement events are partitioned by the date when the corresponding 
operations are
+  // created.
   override def partitions: Seq[(String, String)] =
     ("day", Utils.getDateFromTimestamp(createTime)) :: Nil
 }
 
 object KyuubiStatementEvent {
-  def apply(statement: ExecuteStatement,
-      statementId: String,
-      state: OperationState,
-      stateTime: Long): KyuubiStatementEvent = {
+
+  /**
+   * Shorthand for instantiating a statement event with a [[ExecuteStatement]] 
instance
+   */
+  def apply(statement: ExecuteStatement): KyuubiStatementEvent = {
     val session = statement.getSession
+    val status = statement.getStatus
     new KyuubiStatementEvent(
-      session.user,
-      statementId,
+      statement.getHandle.identifier.toString,
+      
Option(statement.remoteOpHandle()).map(OperationHandle(_).identifier.toString).orNull,
       statement.statement,
-      session.ipAddress,
+      statement.shouldRunAsync,
+      status.state.name(),
+      status.lastModified,
+      status.create,
+      status.start,
+      status.completed,
+      status.exception,
       session.handle.identifier.toString,
-      stateTime,
-      state.toString,
-      stateTime)
+      session.user)
   }
 }
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
index 6df933b..643080a 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
@@ -41,11 +41,8 @@ class ExecuteStatement(
     override val statement: String,
     override val shouldRunAsync: Boolean,
     queryTimeout: Long)
-  extends KyuubiOperation(
-    OperationType.EXECUTE_STATEMENT, session, client) {
-
-  val statementEvent: KyuubiStatementEvent =
-    KyuubiStatementEvent(this, statementId, state, lastAccessTime)
+  extends KyuubiOperation(OperationType.EXECUTE_STATEMENT, session, client) {
+  EventLoggingService.onEvent(KyuubiStatementEvent(this))
 
   private final val _operationLog: OperationLog = if (shouldRunAsync) {
     OperationLog.createOperationLog(session, getHandle)
@@ -59,8 +56,6 @@ class ExecuteStatement(
 
   override def getOperationLog: Option[OperationLog] = Option(_operationLog)
 
-  EventLoggingService.onEvent(statementEvent)
-
   override def beforeRun(): Unit = {
     OperationLog.setCurrentOperationLog(_operationLog)
     setHasResultSet(true)
@@ -180,15 +175,7 @@ class ExecuteStatement(
 
   override def setState(newState: OperationState): Unit = {
     super.setState(newState)
-    statementEvent.state = newState.toString
-    statementEvent.stateTime = lastAccessTime
-    EventLoggingService.onEvent(statementEvent)
-  }
-
-  override def setOperationException(opEx: KyuubiSQLException): Unit = {
-    super.setOperationException(opEx)
-    statementEvent.exception = opEx.toString
-    EventLoggingService.onEvent(statementEvent)
+    EventLoggingService.onEvent(KyuubiStatementEvent(this))
   }
 
   override def close(): Unit = {
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
index dc6ae77..a69814d 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
@@ -57,7 +57,6 @@ abstract class KyuubiOperation(
           MetricsSystem.tracing {
             _.incCount(MetricRegistry.name(STATEMENT_FAIL, errorType))
           }
-          setState(OperationState.ERROR)
           val ke = e match {
             case kse: KyuubiSQLException => kse
             case te: TTransportException if te.getType == 
TTransportException.END_OF_FILE &&
@@ -69,6 +68,7 @@ abstract class KyuubiOperation(
               KyuubiSQLException(s"Error $action $opType: 
${Utils.stringifyException(e)}", e)
           }
           setOperationException(ke)
+          setState(OperationState.ERROR)
           throw ke
         }
       }
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/events/EventLoggingServiceSuite.scala
 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/events/EventLoggingServiceSuite.scala
index 59fcf37..7739861 100644
--- 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/events/EventLoggingServiceSuite.scala
+++ 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/events/EventLoggingServiceSuite.scala
@@ -48,7 +48,7 @@ class EventLoggingServiceSuite extends WithKyuubiServer with 
HiveJDBCTestHelper
 
   override protected def jdbcUrl: String = getJdbcUrl
 
-  test("statementEvent: generate, dump and query") {
+  test("round-trip for logging and querying statement events for both kyuubi 
server and engine") {
     val hostName = InetAddress.getLocalHost.getCanonicalHostName
     val serverStatementEventPath =
       Paths.get(serverLogRoot, "kyuubi_statement", s"day=$currentDate", 
s"server-$hostName.json")
@@ -58,7 +58,6 @@ class EventLoggingServiceSuite extends WithKyuubiServer with 
HiveJDBCTestHelper
 
     withJdbcStatement() { statement =>
       statement.execute(sql)
-
       // check server statement events
       val serverTable = serverStatementEventPath.getParent
       val resultSet = statement.executeQuery(s"SELECT * FROM 
`json`.`${serverTable}`" +
@@ -66,9 +65,11 @@ class EventLoggingServiceSuite extends WithKyuubiServer with 
HiveJDBCTestHelper
       val states = Array(INITIALIZED, PENDING, RUNNING, FINISHED, CLOSED)
       var stateIndex = 0
       while (resultSet.next()) {
-        assert(resultSet.getString("user") == Utils.currentUser)
-        assert(resultSet.getString("statement") == sql)
-        assert(resultSet.getString("state") == states(stateIndex).toString)
+        assert(resultSet.getString("statement") === sql)
+        assert(resultSet.getString("shouldRunAsync") === "true")
+        assert(resultSet.getString("state") === states(stateIndex).name())
+        assert(resultSet.getString("exception") === null)
+        assert(resultSet.getString("sessionUser") === Utils.currentUser)
         stateIndex += 1
       }
 

Reply via email to