This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 6e5162e8a7 [KYUUBI #6626] Fix operation never expired issue with
periodical listOperations api calls
6e5162e8a7 is described below
commit 6e5162e8a7a7c3d890aa2020b17306bba507f419
Author: Wang, Fei <[email protected]>
AuthorDate: Sun Aug 18 22:02:25 2024 -0700
[KYUUBI #6626] Fix operation never expired issue with periodical
listOperations api calls
# :mag: Description
## Issue References ๐
This pull request fixes operation never expired issue.
I found that, some Kyuubi operations never expired.
After investigation, the root cause:
1. our kyuubi console lookup the operations periodically
2. it retrieves the kyuubi operation events
3. the KyuubiOperationEvent::apply will get the operation status and update
the operation the last access time, https://github.com/apache/kyuubi/pull/2452
4. then the operation never expired.
## Describe Your Solution ๐ง
Just get the operation event without updating the operation last access
time.
## Types of changes :bookmark:
- [x] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing
functionality to change)
## Test Plan ๐งช
#### Behavior Without This Pull Request :coffin:
#### Behavior With This Pull Request :tada:
#### Related Unit Tests
---
# Checklist ๐
- [x] This patch was not authored or co-authored using [Generative
Tooling](https://www.apache.org/legal/generative-tooling.html)
**Be nice. Be informative.**
Closes #6626 from turboFei/operation_not_idle.
Closes #6626
14e6898da [Wang, Fei] Get operation event
Authored-by: Wang, Fei <[email protected]>
Signed-off-by: Wang, Fei <[email protected]>
---
.../kyuubi/events/KyuubiOperationEvent.scala | 31 +---------------------
.../apache/kyuubi/operation/KyuubiOperation.scala | 24 +++++++++++++++--
.../org/apache/kyuubi/server/api/ApiUtils.scala | 5 ++--
3 files changed, 25 insertions(+), 35 deletions(-)
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiOperationEvent.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiOperationEvent.scala
index 2a103213e5..f67b088a50 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiOperationEvent.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiOperationEvent.scala
@@ -18,8 +18,6 @@
package org.apache.kyuubi.events
import org.apache.kyuubi.Utils
-import org.apache.kyuubi.operation.{KyuubiOperation, OperationHandle}
-import org.apache.kyuubi.session.KyuubiSession
/**
* A [[KyuubiOperationEvent]] used to tracker the lifecycle of an operation at
server side.
@@ -45,7 +43,7 @@ import org.apache.kyuubi.session.KyuubiSession
* @param kyuubiInstance the parent session connection url
* @param metrics the operation metrics
*/
-case class KyuubiOperationEvent private (
+case class KyuubiOperationEvent(
statementId: String,
remoteId: String,
statement: String,
@@ -67,30 +65,3 @@ case class KyuubiOperationEvent private (
override def partitions: Seq[(String, String)] =
("day", Utils.getDateFromTimestamp(createTime)) :: Nil
}
-
-object KyuubiOperationEvent {
-
- /**
- * Shorthand for instantiating a operation event with a [[KyuubiOperation]]
instance
- */
- def apply(operation: KyuubiOperation): KyuubiOperationEvent = {
- val session = operation.getSession.asInstanceOf[KyuubiSession]
- val status = operation.getStatus
- new KyuubiOperationEvent(
- operation.statementId,
-
Option(operation.remoteOpHandle()).map(OperationHandle(_).identifier.toString).orNull,
- operation.statement,
- operation.shouldRunAsync,
- status.state.name(),
- status.lastModified,
- status.create,
- status.start,
- status.completed,
- status.exception,
- session.handle.identifier.toString,
- session.user,
- session.sessionType.toString,
- session.connectionUrl,
- operation.metrics)
- }
-}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
index d0289abb99..a543bddb6c 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
@@ -206,7 +206,7 @@ abstract class KyuubiOperation(session: Session) extends
AbstractOperation(sessi
protected def eventEnabled: Boolean = false
- if (eventEnabled) EventBus.post(KyuubiOperationEvent(this))
+ if (eventEnabled) EventBus.post(getOperationEvent)
override def setState(newState: OperationState): Unit = {
MetricsSystem.tracing { ms =>
@@ -217,6 +217,26 @@ abstract class KyuubiOperation(session: Session) extends
AbstractOperation(sessi
ms.markMeter(MetricRegistry.name(OPERATION_STATE,
newState.toString.toLowerCase))
}
super.setState(newState)
- if (eventEnabled) EventBus.post(KyuubiOperationEvent(this))
+ if (eventEnabled) EventBus.post(getOperationEvent)
+ }
+
+ def getOperationEvent: KyuubiOperationEvent = {
+ val kyuubiSession = session.asInstanceOf[KyuubiSession]
+ KyuubiOperationEvent(
+ statementId,
+
Option(remoteOpHandle()).map(OperationHandle(_).identifier.toString).orNull,
+ statement,
+ shouldRunAsync,
+ state.name(),
+ lastAccessTime,
+ createTime,
+ startTime,
+ completedTime,
+ Option(operationException),
+ kyuubiSession.handle.identifier.toString,
+ kyuubiSession.user,
+ kyuubiSession.sessionType.toString,
+ kyuubiSession.connectionUrl,
+ metrics)
}
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/ApiUtils.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/ApiUtils.scala
index d7d1727c52..1676ba1575 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/ApiUtils.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/ApiUtils.scala
@@ -22,7 +22,6 @@ import scala.collection.JavaConverters._
import org.apache.kyuubi.{Logging, Utils}
import org.apache.kyuubi.client.api.v1.dto
import org.apache.kyuubi.client.api.v1.dto.{OperationData, OperationProgress,
ServerData, SessionData}
-import org.apache.kyuubi.events.KyuubiOperationEvent
import org.apache.kyuubi.ha.client.ServiceNodeInfo
import org.apache.kyuubi.operation.KyuubiOperation
import org.apache.kyuubi.session.KyuubiSession
@@ -86,7 +85,7 @@ object ApiUtils extends Logging {
}
def operationEvent(operation: KyuubiOperation): dto.KyuubiOperationEvent = {
- val opEvent = KyuubiOperationEvent(operation)
+ val opEvent = operation.getOperationEvent
dto.KyuubiOperationEvent.builder()
.statementId(opEvent.statementId)
.remoteId(opEvent.remoteId)
@@ -108,7 +107,7 @@ object ApiUtils extends Logging {
}
def operationData(operation: KyuubiOperation): OperationData = {
- val opEvent = KyuubiOperationEvent(operation)
+ val opEvent = operation.getOperationEvent
new OperationData(
opEvent.statementId,
opEvent.remoteId,