This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new fd17dd0 [KYUUBI #1300] Detecting critical errors
fd17dd0 is described below
commit fd17dd0ae429cc62b45ae705b362cb80d6015407
Author: Kent Yao <[email protected]>
AuthorDate: Tue Nov 2 11:46:51 2021 +0800
[KYUUBI #1300] Detecting critical errors
<!--
Thanks for sending a pull request!
Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://kyuubi.readthedocs.io/en/latest/community/contributions.html
2. If the PR is related to an issue in
https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your
PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
3. If the PR is unfinished, add '[WIP]' in your PR title, e.g.,
'[WIP][KYUUBI #XXXX] Your PR title ...'.
-->
### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
1. If you add a feature, you can talk about the use case of it.
2. If you fix a bug, you can clarify why it is a bug.
-->
For critical errors at engine side, it is not handled properly. For
example,when engine oom
- server may not be able to get operation statuses because of no response
of engine side. In this case, client only get a ambiguous `read timeout` as a
final cause.
- the oom hook of engine side might directly crash the engine, when the
server is still trying to get operation status
In this PR,
- a config for retry to make the operation status updating process more
robust
- make the engine oom hook only de register itself to make it able to
recover for some transient errors
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run
test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #1312 from yaooqinn/1300.
Closes #1300
a715ecca [Kent Yao] add comments
a816b0f0 [Kent Yao] refine
3557c927 [Kent Yao] add comments
2ed8bfb4 [Kent Yao] refine
aefd2a7f [Kent Yao] update doc
f2ea7e4c [Kent Yao] restore SparkOperation
386e4eac [Kent Yao] [KYUUBI #1300] Detecting critical errors
Authored-by: Kent Yao <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
docs/deployment/settings.md | 1 +
.../spark/SparkThriftBinaryFrontendService.scala | 11 ++++-
.../engine/spark/operation/SparkOperation.scala | 1 +
.../org/apache/kyuubi/config/KyuubiConf.scala | 8 ++++
.../service/ThriftBinaryFrontendService.scala | 13 +++--
.../service/NoopThriftBinaryFrontendService.scala | 4 +-
.../kyuubi/client/KyuubiSyncThriftClient.scala | 37 +++++++++++----
.../apache/kyuubi/operation/ExecuteStatement.scala | 55 ++++++++++++++--------
.../apache/kyuubi/operation/KyuubiOperation.scala | 51 ++++++++++++--------
.../server/KyuubiThriftBinaryFrontendService.scala | 6 ++-
10 files changed, 128 insertions(+), 59 deletions(-)
diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md
index cfe3eaa..be25577 100644
--- a/docs/deployment/settings.md
+++ b/docs/deployment/settings.md
@@ -273,6 +273,7 @@ kyuubi\.operation\.log<br>\.dir\.root|<div style='width:
65pt;word-wrap: break-w
kyuubi\.operation\.plan<br>\.only\.mode|<div style='width: 65pt;word-wrap:
break-word;white-space: normal'>NONE</div>|<div style='width: 170pt;word-wrap:
break-word;white-space: normal'>Whether to perform the statement in a PARSE,
ANALYZE, OPTIMIZE only way without executing the query. When it is NONE, the
statement will be fully executed</div>|<div style='width:
30pt'>string</div>|<div style='width: 20pt'>1.4.0</div>
kyuubi\.operation<br>\.query\.timeout|<div style='width: 65pt;word-wrap:
break-word;white-space: normal'><undefined></div>|<div style='width:
170pt;word-wrap: break-word;white-space: normal'>Timeout for query executions
at server-side, take affect with client-side
timeout(`java.sql.Statement.setQueryTimeout`) together, a running query will be
cancelled automatically if timeout. It's off by default, which means only
client-side take fully control whether the query should timeout or [...]
kyuubi\.operation<br>\.scheduler\.pool|<div style='width: 65pt;word-wrap:
break-word;white-space: normal'><undefined></div>|<div style='width:
170pt;word-wrap: break-word;white-space: normal'>The scheduler pool of job.
Note that, this config should be used after change Spark config
spark.scheduler.mode=FAIR.</div>|<div style='width: 30pt'>string</div>|<div
style='width: 20pt'>1.1.1</div>
+kyuubi\.operation<br>\.status\.polling\.max<br>\.attempts|<div style='width:
65pt;word-wrap: break-word;white-space: normal'>5</div>|<div style='width:
170pt;word-wrap: break-word;white-space: normal'>Max attempts for long polling
asynchronous running sql query's status on raw transport failures, e.g.
TTransportException</div>|<div style='width: 30pt'>int</div>|<div style='width:
20pt'>1.4.0</div>
kyuubi\.operation<br>\.status\.polling<br>\.timeout|<div style='width:
65pt;word-wrap: break-word;white-space: normal'>PT5S</div>|<div style='width:
170pt;word-wrap: break-word;white-space: normal'>Timeout(ms) for long polling
asynchronous running sql query's status</div>|<div style='width:
30pt'>duration</div>|<div style='width: 20pt'>1.0.0</div>
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkThriftBinaryFrontendService.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkThriftBinaryFrontendService.scala
index fce4b8e..ee0862d 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkThriftBinaryFrontendService.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkThriftBinaryFrontendService.scala
@@ -31,7 +31,7 @@ import org.apache.kyuubi.util.KyuubiHadoopUtils
class SparkThriftBinaryFrontendService(
override val serverable: Serverable)
- extends ThriftBinaryFrontendService("SparkThriftBinaryFrontendService",
serverable) {
+ extends ThriftBinaryFrontendService("SparkThriftBinaryFrontendService") {
import SparkThriftBinaryFrontendService._
private lazy val sc =
be.asInstanceOf[SparkSQLBackendService].sparkSession.sparkContext
@@ -147,6 +147,15 @@ class SparkThriftBinaryFrontendService(
s"${serverAddr.getHostAddress}:$portNum"
}
}
+
+ // When a OOM occurs, here we de-register the engine by stop its
discoveryService.
+ // Then the current engine will not be connected by new client anymore but
keep the existing ones
+ // alive. In this case we can reduce the engine's overhead and make it
possible recover from that.
+ // We shall not tear down the whole engine by serverable.stop to make the
engine unreachable for
+ // the existing clients which are still getting statuses and reporting to
the end-users.
+ override protected def oomHook: Runnable = {
+ () => discoveryService.foreach(_.stop())
+ }
}
object SparkThriftBinaryFrontendService {
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
index 0024f4a..a304e95 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
@@ -93,6 +93,7 @@ abstract class SparkOperation(spark: SparkSession, opType:
OperationType, sessio
setOperationException(ke)
throw ke
} else if (isTerminalState(state)) {
+ setOperationException(KyuubiSQLException(errMsg))
warn(s"Ignore exception in terminal state with $statementId:
$errMsg")
} else {
setState(OperationState.ERROR)
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index e38fa8d..5157a3d 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -651,6 +651,14 @@ object KyuubiConf {
.timeConf
.createWithDefault(Duration.ofSeconds(5).toMillis)
+ val OPERATION_STATUS_POLLING_MAX_ATTEMPTS: ConfigEntry[Int] =
+ buildConf("operation.status.polling.max.attempts")
+ .doc("Max attempts for long polling asynchronous running sql query's
status on raw" +
+ " transport failures, e.g. TTransportException")
+ .version("1.4.0")
+ .intConf
+ .createWithDefault(5)
+
val OPERATION_FORCE_CANCEL: ConfigEntry[Boolean] =
buildConf("operation.interrupt.on.cancel")
.doc("When true, all running tasks will be interrupted if one cancels a
query. " +
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/ThriftBinaryFrontendService.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/ThriftBinaryFrontendService.scala
index 3b3db63..27c74ec 100644
---
a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/ThriftBinaryFrontendService.scala
+++
b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/ThriftBinaryFrontendService.scala
@@ -29,21 +29,18 @@ import org.apache.thrift.protocol.{TBinaryProtocol,
TProtocol}
import org.apache.thrift.server.{ServerContext, TServer, TServerEventHandler,
TThreadPoolServer}
import org.apache.thrift.transport.{TServerSocket, TTransport}
-import org.apache.kyuubi.{KyuubiException, KyuubiSQLException, Logging}
-import org.apache.kyuubi.Utils
+import org.apache.kyuubi.{KyuubiException, KyuubiSQLException, Logging, Utils}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.operation.{FetchOrientation, OperationHandle}
import org.apache.kyuubi.service.authentication.KyuubiAuthenticationFactory
import org.apache.kyuubi.session.SessionHandle
import org.apache.kyuubi.util.{ExecutorPoolCaptureOom, KyuubiHadoopUtils,
NamedThreadFactory}
-abstract class ThriftBinaryFrontendService(
- name: String,
- serverable: Serverable)
+abstract class ThriftBinaryFrontendService(name: String)
extends AbstractFrontendService(name) with TCLIService.Iface with Runnable
with Logging {
- import ThriftBinaryFrontendService._
import KyuubiConf._
+ import ThriftBinaryFrontendService._
private var server: Option[TServer] = None
private var serverThread: Thread = _
@@ -54,6 +51,8 @@ abstract class ThriftBinaryFrontendService(
private var authFactory: KyuubiAuthenticationFactory = _
private var hadoopConf: Configuration = _
+ protected def oomHook: Runnable
+
override def initialize(conf: KyuubiConf): Unit = {
this.conf = conf
@@ -69,7 +68,7 @@ abstract class ThriftBinaryFrontendService(
name + "Handler-Pool",
minThreads, maxThreads,
keepAliveTime,
- () => serverable.stop())
+ oomHook)
authFactory = new KyuubiAuthenticationFactory(conf)
val transFactory = authFactory.getTTransportFactory
val tProcFactory = authFactory.getTProcessorFactory(this)
diff --git
a/kyuubi-common/src/test/scala/org/apache/kyuubi/service/NoopThriftBinaryFrontendService.scala
b/kyuubi-common/src/test/scala/org/apache/kyuubi/service/NoopThriftBinaryFrontendService.scala
index 18e07f9..8c0b1db 100644
---
a/kyuubi-common/src/test/scala/org/apache/kyuubi/service/NoopThriftBinaryFrontendService.scala
+++
b/kyuubi-common/src/test/scala/org/apache/kyuubi/service/NoopThriftBinaryFrontendService.scala
@@ -18,9 +18,11 @@
package org.apache.kyuubi.service
class NoopThriftBinaryFrontendService(override val serverable: Serverable)
- extends ThriftBinaryFrontendService("NoopThriftBinaryFrontendService",
serverable) {
+ extends ThriftBinaryFrontendService("NoopThriftBinaryFrontendService") {
override val discoveryService: Option[Service] = None
override def connectionUrl: String = serverAddr.getCanonicalHostName + ":" +
portNum
+
+ override protected def oomHook: Runnable = () => serverable.stop()
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
index de0b278..56547b9 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
@@ -24,13 +24,14 @@ import scala.collection.JavaConverters._
import org.apache.hive.service.rpc.thrift._
import org.apache.thrift.protocol.TProtocol
-import org.apache.kyuubi.KyuubiSQLException
+import org.apache.kyuubi.{KyuubiSQLException, Logging}
import org.apache.kyuubi.operation.FetchOrientation
import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
import org.apache.kyuubi.session.SessionHandle
import org.apache.kyuubi.util.ThriftUtils
-class KyuubiSyncThriftClient(protocol: TProtocol) extends
TCLIService.Client(protocol) {
+class KyuubiSyncThriftClient(protocol: TProtocol)
+ extends TCLIService.Client(protocol) with Logging {
@volatile private var _remoteSessionHandle: TSessionHandle = _
@@ -161,22 +162,30 @@ class KyuubiSyncThriftClient(protocol: TProtocol) extends
TCLIService.Client(pro
resp.getOperationHandle
}
- override def GetOperationStatus(req: TGetOperationStatusReq):
TGetOperationStatusResp = {
- withLockAcquired {
- super.GetOperationStatus(req)
- }
+ def getOperationStatus(operationHandle: TOperationHandle):
TGetOperationStatusResp = {
+ val req = new TGetOperationStatusReq(operationHandle)
+ val resp = withLockAcquired(GetOperationStatus(req))
+ resp
}
def cancelOperation(operationHandle: TOperationHandle): Unit = {
val req = new TCancelOperationReq(operationHandle)
val resp = withLockAcquired(CancelOperation(req))
- ThriftUtils.verifyTStatus(resp.getStatus)
+ if (resp.getStatus.getStatusCode == TStatusCode.SUCCESS_STATUS) {
+ info(s"$req succeed on engine side")
+ } else {
+ warn(s"$req failed on engine side", KyuubiSQLException(resp.getStatus))
+ }
}
def closeOperation(operationHandle: TOperationHandle): Unit = {
val req = new TCloseOperationReq(operationHandle)
val resp = withLockAcquired(CloseOperation(req))
- ThriftUtils.verifyTStatus(resp.getStatus)
+ if (resp.getStatus.getStatusCode == TStatusCode.SUCCESS_STATUS) {
+ info(s"$req succeed on engine side")
+ } else {
+ warn(s"$req failed on engine side", KyuubiSQLException(resp.getStatus))
+ }
}
def getResultSetMetadata(operationHandle: TOperationHandle): TTableSchema = {
@@ -210,7 +219,15 @@ class KyuubiSyncThriftClient(protocol: TProtocol) extends
TCLIService.Client(pro
val req = new TRenewDelegationTokenReq()
req.setSessionHandle(_remoteSessionHandle)
req.setDelegationToken(encodedCredentials)
- val resp = RenewDelegationToken(req)
- ThriftUtils.verifyTStatus(resp.getStatus)
+ try {
+ val resp = withLockAcquired(RenewDelegationToken(req))
+ if (resp.getStatus.getStatusCode == TStatusCode.SUCCESS_STATUS) {
+ debug(s"$req succeed on engine side")
+ } else {
+ warn(s"$req failed on engine side", KyuubiSQLException(resp.getStatus))
+ }
+ } catch {
+ case e: Exception => warn(s"$req failed on engine side", e)
+ }
}
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
index 40a072f..e3fdb9b 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
@@ -19,14 +19,17 @@ package org.apache.kyuubi.operation
import scala.collection.JavaConverters._
-import org.apache.hive.service.rpc.thrift.{TFetchOrientation,
TFetchResultsReq, TGetOperationStatusReq}
+import org.apache.hive.service.rpc.thrift.TGetOperationStatusResp
import org.apache.hive.service.rpc.thrift.TOperationState._
+import org.apache.thrift.TException
import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.client.KyuubiSyncThriftClient
+import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.events.KyuubiStatementEvent
import org.apache.kyuubi.metrics.MetricsConstants._
import org.apache.kyuubi.metrics.MetricsSystem
+import org.apache.kyuubi.operation.FetchOrientation.FETCH_NEXT
import org.apache.kyuubi.operation.OperationState.OperationState
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.server.EventLoggingService
@@ -50,15 +53,12 @@ class ExecuteStatement(
null
}
- override def getOperationLog: Option[OperationLog] = Option(_operationLog)
-
- private lazy val statusReq = new TGetOperationStatusReq(_remoteOpHandle)
- private lazy val fetchLogReq = {
- val req = new TFetchResultsReq(_remoteOpHandle,
TFetchOrientation.FETCH_NEXT, 1000)
- req.setFetchType(1.toShort)
- req
+ private val maxStatusPollOnFailure = {
+
session.sessionManager.getConf.get(KyuubiConf.OPERATION_STATUS_POLLING_MAX_ATTEMPTS)
}
+ override def getOperationLog: Option[OperationLog] = Option(_operationLog)
+
EventLoggingService.onEvent(statementEvent)
override def beforeRun(): Unit = {
@@ -83,7 +83,29 @@ class ExecuteStatement(
private def waitStatementComplete(): Unit = try {
setState(OperationState.RUNNING)
- var statusResp = client.GetOperationStatus(statusReq)
+ var statusResp: TGetOperationStatusResp = null
+ var currentAttempts = 0
+
+ def getOperationStatusWithRetry: Unit = {
+ try {
+ statusResp = client.getOperationStatus(_remoteOpHandle)
+ currentAttempts = 0 // reset attempts whenever get touch with engine
again
+ } catch {
+ case e: TException if currentAttempts >= maxStatusPollOnFailure =>
+ error(s"Failed to get ${session.user}'s query[$getHandle] status
after" +
+ s" $maxStatusPollOnFailure times, aborting", e)
+ throw e
+ case e: TException =>
+ currentAttempts += 1
+ warn(s"Failed to get ${session.user}'s query[$getHandle] status" +
+ s" ($currentAttempts / $maxStatusPollOnFailure)", e)
+ Thread.sleep(100)
+ }
+ }
+
+ // initialize operation status
+ while (statusResp == null) { getOperationStatusWithRetry }
+
var isComplete = false
while (!isComplete) {
fetchQueryLog()
@@ -94,7 +116,7 @@ class ExecuteStatement(
remoteState match {
case INITIALIZED_STATE | PENDING_STATE | RUNNING_STATE =>
isComplete = false
- statusResp = client.GetOperationStatus(statusReq)
+ getOperationStatusWithRetry
case FINISHED_STATE =>
setState(OperationState.FINISHED)
@@ -109,14 +131,10 @@ class ExecuteStatement(
setState(OperationState.TIMEOUT)
case ERROR_STATE =>
- setState(OperationState.ERROR)
- val ke = KyuubiSQLException(statusResp.getErrorMessage)
- setOperationException(ke)
+ throw KyuubiSQLException(statusResp.getErrorMessage)
case UKNOWN_STATE =>
- setState(OperationState.ERROR)
- val ke = KyuubiSQLException(s"UNKNOWN STATE for $statement")
- setOperationException(ke)
+ throw KyuubiSQLException(s"UNKNOWN STATE for $statement")
}
sendCredentialsIfNeeded()
}
@@ -136,9 +154,8 @@ class ExecuteStatement(
private def fetchQueryLog(): Unit = {
getOperationLog.foreach { logger =>
try {
- val resp = client.FetchResults(fetchLogReq)
- verifyTStatus(resp.getStatus)
- val logs =
resp.getResults.getColumns.get(0).getStringVal.getValues.asScala
+ val ret = client.fetchResults(_remoteOpHandle, FETCH_NEXT, 1000,
fetchLog = true)
+ val logs = ret.getColumns.get(0).getStringVal.getValues.asScala
logs.foreach(log => logger.write(log + "\n"))
} catch {
case _: Exception => // do nothing
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
index 2568158..dc6ae77 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
@@ -22,9 +22,10 @@ import java.io.IOException
import com.codahale.metrics.MetricRegistry
import org.apache.commons.lang3.StringUtils
import org.apache.hive.service.rpc.thrift._
+import org.apache.thrift.TException
import org.apache.thrift.transport.TTransportException
-import org.apache.kyuubi.KyuubiSQLException
+import org.apache.kyuubi.{KyuubiSQLException, Utils}
import org.apache.kyuubi.client.KyuubiSyncThriftClient
import org.apache.kyuubi.metrics.MetricsConstants.STATEMENT_FAIL
import org.apache.kyuubi.metrics.MetricsSystem
@@ -64,8 +65,8 @@ abstract class KyuubiOperation(
// https://issues.apache.org/jira/browse/THRIFT-4858
KyuubiSQLException(
s"Error $action $opType: Socket for ${session.handle} is
closed", e)
- case _ =>
- KyuubiSQLException(s"Error $action $opType: ${e.getMessage}", e)
+ case e =>
+ KyuubiSQLException(s"Error $action $opType:
${Utils.stringifyException(e)}", e)
}
setOperationException(ke)
throw ke
@@ -86,27 +87,37 @@ abstract class KyuubiOperation(
}
}
- override def cancel(): Unit = {
- if (_remoteOpHandle != null && !isClosedOrCanceled) {
- try {
- client.cancelOperation(_remoteOpHandle)
- setState(OperationState.CANCELED)
- } catch onError("cancelling")
+ override def cancel(): Unit = state.synchronized {
+ if (!isClosedOrCanceled) {
+ setState(OperationState.CANCELED)
+ if (_remoteOpHandle != null) {
+ try {
+ client.cancelOperation(_remoteOpHandle)
+ } catch {
+ case e @ (_: TException | _: KyuubiSQLException) =>
+ warn(s"Error cancelling ${_remoteOpHandle.getOperationId}:
${e.getMessage}", e)
+ }
+ }
}
}
- override def close(): Unit = {
- if (_remoteOpHandle != null && !isClosedOrCanceled) {
- try {
- getOperationLog.foreach(_.close())
- } catch {
- case e: IOException =>
- error(e.getMessage, e)
+ override def close(): Unit = state.synchronized {
+ if (!isClosedOrCanceled) {
+ setState(OperationState.CLOSED)
+ if (_remoteOpHandle != null) {
+ try {
+ getOperationLog.foreach(_.close())
+ } catch {
+ case e: IOException => error(e.getMessage, e)
+ }
+
+ try {
+ client.closeOperation(_remoteOpHandle)
+ } catch {
+ case e @(_: TException | _: KyuubiSQLException) =>
+ warn(s"Error closing ${_remoteOpHandle.getOperationId}:
${e.getMessage}", e)
+ }
}
- try {
- client.closeOperation(_remoteOpHandle)
- setState(OperationState.CLOSED)
- } catch onError("closing")
}
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiThriftBinaryFrontendService.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiThriftBinaryFrontendService.scala
index 77d3e60..1c1710e 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiThriftBinaryFrontendService.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiThriftBinaryFrontendService.scala
@@ -22,7 +22,7 @@ import org.apache.kyuubi.service.{Serverable, Service,
ThriftBinaryFrontendServi
class KyuubiThriftBinaryFrontendService(
override val serverable: Serverable)
- extends ThriftBinaryFrontendService("KyuubiThriftBinaryFrontendService",
serverable) {
+ extends ThriftBinaryFrontendService("KyuubiThriftBinaryFrontendService") {
override lazy val discoveryService: Option[Service] = {
if (ServiceDiscovery.supportServiceDiscovery(conf)) {
@@ -36,4 +36,8 @@ class KyuubiThriftBinaryFrontendService(
checkInitialized()
s"${serverAddr.getCanonicalHostName}:$portNum"
}
+
+ override protected def oomHook: Runnable = {
+ () => serverable.stop()
+ }
}