This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new fd17dd0  [KYUUBI #1300] Detecting critical errors
fd17dd0 is described below

commit fd17dd0ae429cc62b45ae705b362cb80d6015407
Author: Kent Yao <[email protected]>
AuthorDate: Tue Nov 2 11:46:51 2021 +0800

    [KYUUBI #1300] Detecting critical errors
    
    <!--
    Thanks for sending a pull request!
    
    Here are some tips for you:
      1. If this is your first time, please read our contributor guidelines: 
https://kyuubi.readthedocs.io/en/latest/community/contributions.html
      2. If the PR is related to an issue in 
https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your 
PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
      3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., 
'[WIP][KYUUBI #XXXX] Your PR title ...'.
    -->
    
    ### _Why are the changes needed?_
    <!--
    Please clarify why the changes are needed. For instance,
      1. If you add a feature, you can talk about the use case of it.
      2. If you fix a bug, you can clarify why it is a bug.
    -->
    
    For critical errors at engine side, it is not handled properly. For 
example,when engine oom
    - server may not be able to get operation statuses because of no response 
of engine side. In this case, client only get a ambiguous `read timeout` as a 
final cause.
    - the oom hook of engine side might directly crash the engine, when the 
server is still trying to get operation status
    
    In this PR,
    - a config for retry to make the operation status updating process more 
robust
    - make the engine oom hook only de register itself to make it able to 
recover for some transient errors
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run 
test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests)
 locally before make a pull request
    
    Closes #1312 from yaooqinn/1300.
    
    Closes #1300
    
    a715ecca [Kent Yao] add comments
    a816b0f0 [Kent Yao] refine
    3557c927 [Kent Yao] add comments
    2ed8bfb4 [Kent Yao] refine
    aefd2a7f [Kent Yao] update doc
    f2ea7e4c [Kent Yao] restore SparkOperation
    386e4eac [Kent Yao] [KYUUBI #1300] Detecting critical errors
    
    Authored-by: Kent Yao <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
---
 docs/deployment/settings.md                        |  1 +
 .../spark/SparkThriftBinaryFrontendService.scala   | 11 ++++-
 .../engine/spark/operation/SparkOperation.scala    |  1 +
 .../org/apache/kyuubi/config/KyuubiConf.scala      |  8 ++++
 .../service/ThriftBinaryFrontendService.scala      | 13 +++--
 .../service/NoopThriftBinaryFrontendService.scala  |  4 +-
 .../kyuubi/client/KyuubiSyncThriftClient.scala     | 37 +++++++++++----
 .../apache/kyuubi/operation/ExecuteStatement.scala | 55 ++++++++++++++--------
 .../apache/kyuubi/operation/KyuubiOperation.scala  | 51 ++++++++++++--------
 .../server/KyuubiThriftBinaryFrontendService.scala |  6 ++-
 10 files changed, 128 insertions(+), 59 deletions(-)

diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md
index cfe3eaa..be25577 100644
--- a/docs/deployment/settings.md
+++ b/docs/deployment/settings.md
@@ -273,6 +273,7 @@ kyuubi\.operation\.log<br>\.dir\.root|<div style='width: 
65pt;word-wrap: break-w
 kyuubi\.operation\.plan<br>\.only\.mode|<div style='width: 65pt;word-wrap: 
break-word;white-space: normal'>NONE</div>|<div style='width: 170pt;word-wrap: 
break-word;white-space: normal'>Whether to perform the statement in a PARSE, 
ANALYZE, OPTIMIZE only way without executing the query. When it is NONE, the 
statement will be fully executed</div>|<div style='width: 
30pt'>string</div>|<div style='width: 20pt'>1.4.0</div>
 kyuubi\.operation<br>\.query\.timeout|<div style='width: 65pt;word-wrap: 
break-word;white-space: normal'>&lt;undefined&gt;</div>|<div style='width: 
170pt;word-wrap: break-word;white-space: normal'>Timeout for query executions 
at server-side, take affect with client-side 
timeout(`java.sql.Statement.setQueryTimeout`) together, a running query will be 
cancelled automatically if timeout. It's off by default, which means only 
client-side take fully control whether the query should timeout or  [...]
 kyuubi\.operation<br>\.scheduler\.pool|<div style='width: 65pt;word-wrap: 
break-word;white-space: normal'>&lt;undefined&gt;</div>|<div style='width: 
170pt;word-wrap: break-word;white-space: normal'>The scheduler pool of job. 
Note that, this config should be used after change Spark config 
spark.scheduler.mode=FAIR.</div>|<div style='width: 30pt'>string</div>|<div 
style='width: 20pt'>1.1.1</div>
+kyuubi\.operation<br>\.status\.polling\.max<br>\.attempts|<div style='width: 
65pt;word-wrap: break-word;white-space: normal'>5</div>|<div style='width: 
170pt;word-wrap: break-word;white-space: normal'>Max attempts for long polling 
asynchronous running sql query's status on raw transport failures, e.g. 
TTransportException</div>|<div style='width: 30pt'>int</div>|<div style='width: 
20pt'>1.4.0</div>
 kyuubi\.operation<br>\.status\.polling<br>\.timeout|<div style='width: 
65pt;word-wrap: break-word;white-space: normal'>PT5S</div>|<div style='width: 
170pt;word-wrap: break-word;white-space: normal'>Timeout(ms) for long polling 
asynchronous running sql query's status</div>|<div style='width: 
30pt'>duration</div>|<div style='width: 20pt'>1.0.0</div>
 
 
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkThriftBinaryFrontendService.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkThriftBinaryFrontendService.scala
index fce4b8e..ee0862d 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkThriftBinaryFrontendService.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkThriftBinaryFrontendService.scala
@@ -31,7 +31,7 @@ import org.apache.kyuubi.util.KyuubiHadoopUtils
 
 class SparkThriftBinaryFrontendService(
     override val serverable: Serverable)
-  extends ThriftBinaryFrontendService("SparkThriftBinaryFrontendService", 
serverable) {
+  extends ThriftBinaryFrontendService("SparkThriftBinaryFrontendService") {
   import SparkThriftBinaryFrontendService._
 
   private lazy val sc = 
be.asInstanceOf[SparkSQLBackendService].sparkSession.sparkContext
@@ -147,6 +147,15 @@ class SparkThriftBinaryFrontendService(
       s"${serverAddr.getHostAddress}:$portNum"
     }
   }
+
+  // When a OOM occurs, here we de-register the engine by stop its 
discoveryService.
+  // Then the current engine will not be connected by new client anymore but 
keep the existing ones
+  // alive. In this case we can reduce the engine's overhead and make it 
possible recover from that.
+  // We shall not tear down the whole engine by serverable.stop to make the 
engine unreachable for
+  // the existing clients which are still getting statuses and reporting to 
the end-users.
+  override protected def oomHook: Runnable = {
+    () => discoveryService.foreach(_.stop())
+  }
 }
 
 object SparkThriftBinaryFrontendService {
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
index 0024f4a..a304e95 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
@@ -93,6 +93,7 @@ abstract class SparkOperation(spark: SparkSession, opType: 
OperationType, sessio
           setOperationException(ke)
           throw ke
         } else if (isTerminalState(state)) {
+          setOperationException(KyuubiSQLException(errMsg))
           warn(s"Ignore exception in terminal state with $statementId: 
$errMsg")
         } else {
           setState(OperationState.ERROR)
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index e38fa8d..5157a3d 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -651,6 +651,14 @@ object KyuubiConf {
       .timeConf
       .createWithDefault(Duration.ofSeconds(5).toMillis)
 
+  val OPERATION_STATUS_POLLING_MAX_ATTEMPTS: ConfigEntry[Int] =
+    buildConf("operation.status.polling.max.attempts")
+      .doc("Max attempts for long polling asynchronous running sql query's 
status on raw" +
+        " transport failures, e.g. TTransportException")
+      .version("1.4.0")
+      .intConf
+      .createWithDefault(5)
+
   val OPERATION_FORCE_CANCEL: ConfigEntry[Boolean] =
     buildConf("operation.interrupt.on.cancel")
       .doc("When true, all running tasks will be interrupted if one cancels a 
query. " +
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/ThriftBinaryFrontendService.scala
 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/ThriftBinaryFrontendService.scala
index 3b3db63..27c74ec 100644
--- 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/ThriftBinaryFrontendService.scala
+++ 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/ThriftBinaryFrontendService.scala
@@ -29,21 +29,18 @@ import org.apache.thrift.protocol.{TBinaryProtocol, 
TProtocol}
 import org.apache.thrift.server.{ServerContext, TServer, TServerEventHandler, 
TThreadPoolServer}
 import org.apache.thrift.transport.{TServerSocket, TTransport}
 
-import org.apache.kyuubi.{KyuubiException, KyuubiSQLException, Logging}
-import org.apache.kyuubi.Utils
+import org.apache.kyuubi.{KyuubiException, KyuubiSQLException, Logging, Utils}
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.operation.{FetchOrientation, OperationHandle}
 import org.apache.kyuubi.service.authentication.KyuubiAuthenticationFactory
 import org.apache.kyuubi.session.SessionHandle
 import org.apache.kyuubi.util.{ExecutorPoolCaptureOom, KyuubiHadoopUtils, 
NamedThreadFactory}
 
-abstract class ThriftBinaryFrontendService(
-    name: String,
-    serverable: Serverable)
+abstract class ThriftBinaryFrontendService(name: String)
   extends AbstractFrontendService(name) with TCLIService.Iface with Runnable 
with Logging {
 
-  import ThriftBinaryFrontendService._
   import KyuubiConf._
+  import ThriftBinaryFrontendService._
 
   private var server: Option[TServer] = None
   private var serverThread: Thread = _
@@ -54,6 +51,8 @@ abstract class ThriftBinaryFrontendService(
   private var authFactory: KyuubiAuthenticationFactory = _
   private var hadoopConf: Configuration = _
 
+  protected def oomHook: Runnable
+
   override def initialize(conf: KyuubiConf): Unit = {
     this.conf = conf
 
@@ -69,7 +68,7 @@ abstract class ThriftBinaryFrontendService(
         name + "Handler-Pool",
         minThreads, maxThreads,
         keepAliveTime,
-        () => serverable.stop())
+        oomHook)
       authFactory = new KyuubiAuthenticationFactory(conf)
       val transFactory = authFactory.getTTransportFactory
       val tProcFactory = authFactory.getTProcessorFactory(this)
diff --git 
a/kyuubi-common/src/test/scala/org/apache/kyuubi/service/NoopThriftBinaryFrontendService.scala
 
b/kyuubi-common/src/test/scala/org/apache/kyuubi/service/NoopThriftBinaryFrontendService.scala
index 18e07f9..8c0b1db 100644
--- 
a/kyuubi-common/src/test/scala/org/apache/kyuubi/service/NoopThriftBinaryFrontendService.scala
+++ 
b/kyuubi-common/src/test/scala/org/apache/kyuubi/service/NoopThriftBinaryFrontendService.scala
@@ -18,9 +18,11 @@
 package org.apache.kyuubi.service
 
 class NoopThriftBinaryFrontendService(override val serverable: Serverable)
-  extends ThriftBinaryFrontendService("NoopThriftBinaryFrontendService", 
serverable) {
+  extends ThriftBinaryFrontendService("NoopThriftBinaryFrontendService") {
 
   override val discoveryService: Option[Service] = None
 
   override def connectionUrl: String = serverAddr.getCanonicalHostName + ":" + 
portNum
+
+  override protected def oomHook: Runnable = () => serverable.stop()
 }
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
index de0b278..56547b9 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
@@ -24,13 +24,14 @@ import scala.collection.JavaConverters._
 import org.apache.hive.service.rpc.thrift._
 import org.apache.thrift.protocol.TProtocol
 
-import org.apache.kyuubi.KyuubiSQLException
+import org.apache.kyuubi.{KyuubiSQLException, Logging}
 import org.apache.kyuubi.operation.FetchOrientation
 import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
 import org.apache.kyuubi.session.SessionHandle
 import org.apache.kyuubi.util.ThriftUtils
 
-class KyuubiSyncThriftClient(protocol: TProtocol) extends 
TCLIService.Client(protocol) {
+class KyuubiSyncThriftClient(protocol: TProtocol)
+  extends TCLIService.Client(protocol) with Logging {
 
   @volatile private var _remoteSessionHandle: TSessionHandle = _
 
@@ -161,22 +162,30 @@ class KyuubiSyncThriftClient(protocol: TProtocol) extends 
TCLIService.Client(pro
     resp.getOperationHandle
   }
 
-  override def GetOperationStatus(req: TGetOperationStatusReq): 
TGetOperationStatusResp = {
-    withLockAcquired {
-      super.GetOperationStatus(req)
-    }
+  def getOperationStatus(operationHandle: TOperationHandle): 
TGetOperationStatusResp = {
+    val req = new TGetOperationStatusReq(operationHandle)
+    val resp = withLockAcquired(GetOperationStatus(req))
+    resp
   }
 
   def cancelOperation(operationHandle: TOperationHandle): Unit = {
     val req = new TCancelOperationReq(operationHandle)
     val resp = withLockAcquired(CancelOperation(req))
-    ThriftUtils.verifyTStatus(resp.getStatus)
+    if (resp.getStatus.getStatusCode == TStatusCode.SUCCESS_STATUS) {
+      info(s"$req succeed on engine side")
+    } else {
+      warn(s"$req failed on engine side", KyuubiSQLException(resp.getStatus))
+    }
   }
 
   def closeOperation(operationHandle: TOperationHandle): Unit = {
     val req = new TCloseOperationReq(operationHandle)
     val resp = withLockAcquired(CloseOperation(req))
-    ThriftUtils.verifyTStatus(resp.getStatus)
+    if (resp.getStatus.getStatusCode == TStatusCode.SUCCESS_STATUS) {
+      info(s"$req succeed on engine side")
+    } else {
+      warn(s"$req failed on engine side", KyuubiSQLException(resp.getStatus))
+    }
   }
 
   def getResultSetMetadata(operationHandle: TOperationHandle): TTableSchema = {
@@ -210,7 +219,15 @@ class KyuubiSyncThriftClient(protocol: TProtocol) extends 
TCLIService.Client(pro
     val req = new TRenewDelegationTokenReq()
     req.setSessionHandle(_remoteSessionHandle)
     req.setDelegationToken(encodedCredentials)
-    val resp = RenewDelegationToken(req)
-    ThriftUtils.verifyTStatus(resp.getStatus)
+    try {
+      val resp = withLockAcquired(RenewDelegationToken(req))
+      if (resp.getStatus.getStatusCode == TStatusCode.SUCCESS_STATUS) {
+        debug(s"$req succeed on engine side")
+      } else {
+        warn(s"$req failed on engine side", KyuubiSQLException(resp.getStatus))
+      }
+    } catch {
+      case e: Exception => warn(s"$req failed on engine side", e)
+    }
   }
 }
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
index 40a072f..e3fdb9b 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
@@ -19,14 +19,17 @@ package org.apache.kyuubi.operation
 
 import scala.collection.JavaConverters._
 
-import org.apache.hive.service.rpc.thrift.{TFetchOrientation, 
TFetchResultsReq, TGetOperationStatusReq}
+import org.apache.hive.service.rpc.thrift.TGetOperationStatusResp
 import org.apache.hive.service.rpc.thrift.TOperationState._
+import org.apache.thrift.TException
 
 import org.apache.kyuubi.KyuubiSQLException
 import org.apache.kyuubi.client.KyuubiSyncThriftClient
+import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.events.KyuubiStatementEvent
 import org.apache.kyuubi.metrics.MetricsConstants._
 import org.apache.kyuubi.metrics.MetricsSystem
+import org.apache.kyuubi.operation.FetchOrientation.FETCH_NEXT
 import org.apache.kyuubi.operation.OperationState.OperationState
 import org.apache.kyuubi.operation.log.OperationLog
 import org.apache.kyuubi.server.EventLoggingService
@@ -50,15 +53,12 @@ class ExecuteStatement(
     null
   }
 
-  override def getOperationLog: Option[OperationLog] = Option(_operationLog)
-
-  private lazy val statusReq = new TGetOperationStatusReq(_remoteOpHandle)
-  private lazy val fetchLogReq = {
-    val req = new TFetchResultsReq(_remoteOpHandle, 
TFetchOrientation.FETCH_NEXT, 1000)
-    req.setFetchType(1.toShort)
-    req
+  private val maxStatusPollOnFailure = {
+    
session.sessionManager.getConf.get(KyuubiConf.OPERATION_STATUS_POLLING_MAX_ATTEMPTS)
   }
 
+  override def getOperationLog: Option[OperationLog] = Option(_operationLog)
+
   EventLoggingService.onEvent(statementEvent)
 
   override def beforeRun(): Unit = {
@@ -83,7 +83,29 @@ class ExecuteStatement(
 
   private def waitStatementComplete(): Unit = try {
     setState(OperationState.RUNNING)
-    var statusResp = client.GetOperationStatus(statusReq)
+    var statusResp: TGetOperationStatusResp = null
+    var currentAttempts = 0
+
+    def getOperationStatusWithRetry: Unit = {
+      try {
+        statusResp = client.getOperationStatus(_remoteOpHandle)
+        currentAttempts = 0 // reset attempts whenever get touch with engine 
again
+      } catch {
+        case e: TException if currentAttempts >= maxStatusPollOnFailure =>
+          error(s"Failed to get ${session.user}'s query[$getHandle] status 
after" +
+            s" $maxStatusPollOnFailure times, aborting", e)
+          throw e
+        case e: TException =>
+          currentAttempts += 1
+          warn(s"Failed to get ${session.user}'s query[$getHandle] status" +
+            s" ($currentAttempts / $maxStatusPollOnFailure)", e)
+          Thread.sleep(100)
+      }
+    }
+
+    // initialize operation status
+    while (statusResp == null) { getOperationStatusWithRetry }
+
     var isComplete = false
     while (!isComplete) {
       fetchQueryLog()
@@ -94,7 +116,7 @@ class ExecuteStatement(
       remoteState match {
         case INITIALIZED_STATE | PENDING_STATE | RUNNING_STATE =>
           isComplete = false
-          statusResp = client.GetOperationStatus(statusReq)
+          getOperationStatusWithRetry
 
         case FINISHED_STATE =>
           setState(OperationState.FINISHED)
@@ -109,14 +131,10 @@ class ExecuteStatement(
           setState(OperationState.TIMEOUT)
 
         case ERROR_STATE =>
-          setState(OperationState.ERROR)
-          val ke = KyuubiSQLException(statusResp.getErrorMessage)
-          setOperationException(ke)
+          throw KyuubiSQLException(statusResp.getErrorMessage)
 
         case UKNOWN_STATE =>
-          setState(OperationState.ERROR)
-          val ke = KyuubiSQLException(s"UNKNOWN STATE for $statement")
-          setOperationException(ke)
+          throw KyuubiSQLException(s"UNKNOWN STATE for $statement")
       }
       sendCredentialsIfNeeded()
     }
@@ -136,9 +154,8 @@ class ExecuteStatement(
   private def fetchQueryLog(): Unit = {
     getOperationLog.foreach { logger =>
       try {
-        val resp = client.FetchResults(fetchLogReq)
-        verifyTStatus(resp.getStatus)
-        val logs = 
resp.getResults.getColumns.get(0).getStringVal.getValues.asScala
+        val ret = client.fetchResults(_remoteOpHandle, FETCH_NEXT, 1000, 
fetchLog = true)
+        val logs = ret.getColumns.get(0).getStringVal.getValues.asScala
         logs.foreach(log => logger.write(log + "\n"))
       } catch {
         case _: Exception => // do nothing
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
index 2568158..dc6ae77 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
@@ -22,9 +22,10 @@ import java.io.IOException
 import com.codahale.metrics.MetricRegistry
 import org.apache.commons.lang3.StringUtils
 import org.apache.hive.service.rpc.thrift._
+import org.apache.thrift.TException
 import org.apache.thrift.transport.TTransportException
 
-import org.apache.kyuubi.KyuubiSQLException
+import org.apache.kyuubi.{KyuubiSQLException, Utils}
 import org.apache.kyuubi.client.KyuubiSyncThriftClient
 import org.apache.kyuubi.metrics.MetricsConstants.STATEMENT_FAIL
 import org.apache.kyuubi.metrics.MetricsSystem
@@ -64,8 +65,8 @@ abstract class KyuubiOperation(
               // https://issues.apache.org/jira/browse/THRIFT-4858
               KyuubiSQLException(
                 s"Error $action $opType: Socket for ${session.handle} is 
closed", e)
-            case _ =>
-              KyuubiSQLException(s"Error $action $opType: ${e.getMessage}", e)
+            case e =>
+              KyuubiSQLException(s"Error $action $opType: 
${Utils.stringifyException(e)}", e)
           }
           setOperationException(ke)
           throw ke
@@ -86,27 +87,37 @@ abstract class KyuubiOperation(
     }
   }
 
-  override def cancel(): Unit = {
-    if (_remoteOpHandle != null && !isClosedOrCanceled) {
-      try {
-        client.cancelOperation(_remoteOpHandle)
-        setState(OperationState.CANCELED)
-      } catch onError("cancelling")
+  override def cancel(): Unit = state.synchronized {
+    if (!isClosedOrCanceled) {
+      setState(OperationState.CANCELED)
+      if (_remoteOpHandle != null) {
+        try {
+          client.cancelOperation(_remoteOpHandle)
+        } catch {
+          case e @ (_: TException | _: KyuubiSQLException) =>
+            warn(s"Error cancelling ${_remoteOpHandle.getOperationId}: 
${e.getMessage}", e)
+        }
+      }
     }
   }
 
-  override def close(): Unit = {
-    if (_remoteOpHandle != null && !isClosedOrCanceled) {
-      try {
-        getOperationLog.foreach(_.close())
-      } catch {
-        case e: IOException =>
-          error(e.getMessage, e)
+  override def close(): Unit = state.synchronized {
+    if (!isClosedOrCanceled) {
+      setState(OperationState.CLOSED)
+      if (_remoteOpHandle != null) {
+        try {
+          getOperationLog.foreach(_.close())
+        } catch {
+          case e: IOException => error(e.getMessage, e)
+        }
+
+        try {
+          client.closeOperation(_remoteOpHandle)
+        } catch {
+          case e @(_: TException | _: KyuubiSQLException) =>
+            warn(s"Error closing ${_remoteOpHandle.getOperationId}: 
${e.getMessage}", e)
+        }
       }
-      try {
-        client.closeOperation(_remoteOpHandle)
-        setState(OperationState.CLOSED)
-      } catch onError("closing")
     }
   }
 
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiThriftBinaryFrontendService.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiThriftBinaryFrontendService.scala
index 77d3e60..1c1710e 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiThriftBinaryFrontendService.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiThriftBinaryFrontendService.scala
@@ -22,7 +22,7 @@ import org.apache.kyuubi.service.{Serverable, Service, 
ThriftBinaryFrontendServi
 
 class KyuubiThriftBinaryFrontendService(
     override val serverable: Serverable)
-  extends ThriftBinaryFrontendService("KyuubiThriftBinaryFrontendService", 
serverable) {
+  extends ThriftBinaryFrontendService("KyuubiThriftBinaryFrontendService") {
 
   override lazy val discoveryService: Option[Service] = {
     if (ServiceDiscovery.supportServiceDiscovery(conf)) {
@@ -36,4 +36,8 @@ class KyuubiThriftBinaryFrontendService(
     checkInitialized()
     s"${serverAddr.getCanonicalHostName}:$portNum"
   }
+
+  override protected def oomHook: Runnable = {
+    () => serverable.stop()
+  }
 }

Reply via email to