This is an automated email from the ASF dual-hosted git repository.
ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 67d1c5d [KYUUBI #1969] Fix race on some service during start and stop
phase
67d1c5d is described below
commit 67d1c5dd513327b350dac3d6855af5223a594087
Author: Kent Yao <[email protected]>
AuthorDate: Tue Mar 1 10:39:41 2022 +0800
[KYUUBI #1969] Fix race on some service during start and stop phase
<!--
Thanks for sending a pull request!
Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://kyuubi.readthedocs.io/en/latest/community/contributions.html
2. If the PR is related to an issue in
https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your
PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
3. If the PR is unfinished, add '[WIP]' in your PR title, e.g.,
'[WIP][KYUUBI #XXXX] Your PR title ...'.
-->
### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
1. If you add a feature, you can talk about the use case of it.
2. If you fix a bug, you can clarify why it is a bug.
-->
#1969 might be fixed by this.
```
12:18:19.885 SparkTBinaryFrontendHandler-Pool: Thread-1260 INFO
SparkSQLSessionManager: Session stopped due to shared level is Connection.
12:18:19.886 SparkTBinaryFrontendHandler-Pool: Thread-1260 INFO
SparkSQLEngine: Service: [SparkTBinaryFrontend] is stopping.
12:18:19.886 SparkTBinaryFrontendHandler-Pool: Thread-1260 INFO
SparkTBinaryFrontendService: SparkTBinaryFrontend has stopped
12:18:19.886 SparkTBinaryFrontendHandler-Pool: Thread-1260 INFO
SparkTBinaryFrontendService: Service: [EngineServiceDiscovery] is stopping.
12:18:19.890 SparkTBinaryFrontendHandler-Pool: Thread-1260 DEBUG
FailedDeleteManager: Path being added to guaranteed delete set:
/kyuubi/CONNECTION/933147f8-1fcb-4cda-875f-0b219f97aaf3/serviceUri=localhost:43147;version=1.5.0-SNAPSHOT;sequence=0000000000
12:18:19.892 ProcessThread(sid:0 cport:40861): INFO PrepRequestProcessor:
Got user-level KeeperException when processing sessionid:0x100000e6ac00000
type:delete cxid:0xb zxid:0x9 txntype:-1 reqpath:n/a Error
Path:/kyuubi/CONNECTION/933147f8-1fcb-4cda-875f-0b219f97aaf3/serviceUri=localhost:43147;version=1.5.0-SNAPSHOT;sequence=0000000000
Error:KeeperErrorCode = NoNode for
/kyuubi/CONNECTION/933147f8-1fcb-4cda-875f-0b219f97aaf3/serviceUri=localhost:43147;version=1.5.0-SNAPSHOT;sequence=
[...]
12:18:19.892 SyncThread:0 DEBUG FinalRequestProcessor: Processing request::
sessionid:0x100000e6ac00000 type:delete cxid:0xa zxid:0x8 txntype:2 reqpath:n/a
12:18:19.893 SparkTBinaryFrontendHandler-Pool: Thread-1260 ERROR
ServiceDiscoveryClient: Failed to close the persistent ephemeral znodenull
java.io.IOException: java.lang.InterruptedException
at
org.apache.curator.framework.recipes.nodes.PersistentNode.close(PersistentNode.java:296)
~[curator-recipes-2.12.0.jar:?]
at
org.apache.kyuubi.ha.client.zookeeper.ServiceDiscoveryClient.deregisterService(ServiceDiscoveryClient.scala:117)
~[kyuubi-ha_2.12-1.5.0-SNAPSHOT.jar:1.5.0-SNAPSHOT]
at
org.apache.kyuubi.ha.client.EngineServiceDiscovery.stop(EngineServiceDiscovery.scala:35)
~[kyuubi-ha_2.12-1.5.0-SNAPSHOT.jar:1.5.0-SNAPSHOT]
at
org.apache.kyuubi.service.CompositeService.$anonfun$stop$2(CompositeService.scala:75)
~[kyuubi-common_2.12-1.5.0-SNAPSHOT.jar:1.5.0-SNAPSHOT]
```
the deregister for connection-level happens in session close and
SparkTBinaryFrontendHandler pool, which might be stopped before finish
executing zk node deletion.
this PR contains some other race issues too
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run
test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #1980 from yaooqinn/conc.
Closes #1969
efd12707 [Kent Yao] address comments
aed6636f [Kent Yao] Fix race on some service during stop phase
63592621 [Kent Yao] Fix race on some service during stop phase
Authored-by: Kent Yao <[email protected]>
Signed-off-by: ulysses-you <[email protected]>
---
.../kyuubi/engine/flink/FlinkSQLEngine.scala | 5 ---
.../events/AbstractEventLoggingService.scala | 2 +-
.../apache/kyuubi/service/TFrontendService.scala | 43 +++++++++++++++++++---
.../org/apache/kyuubi/metrics/MetricsSystem.scala | 2 +-
4 files changed, 39 insertions(+), 13 deletions(-)
diff --git
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala
index a08b461..a242df9 100644
---
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala
+++
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala
@@ -58,11 +58,6 @@ case class FlinkSQLEngine(engineContext: DefaultContext)
extends Serverable("Fli
currentEngine.get.stop()
}
}
-
- override def stop(): Unit = {
- super.stop()
- }
-
}
object FlinkSQLEngine extends Logging {
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/events/AbstractEventLoggingService.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/events/AbstractEventLoggingService.scala
index 248d3c7..51de9c8 100644
---
a/kyuubi-common/src/main/scala/org/apache/kyuubi/events/AbstractEventLoggingService.scala
+++
b/kyuubi-common/src/main/scala/org/apache/kyuubi/events/AbstractEventLoggingService.scala
@@ -51,7 +51,7 @@ abstract class AbstractEventLoggingService
object EventLogging {
- private[events] var _service: Option[AbstractEventLoggingService] = None
+ @volatile private[events] var _service: Option[AbstractEventLoggingService]
= None
def onEvent(event: KyuubiEvent): Unit = {
_service.foreach(_.onEvent(event))
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/TFrontendService.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/TFrontendService.scala
index 957ff2b..464901d 100644
---
a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/TFrontendService.scala
+++
b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/TFrontendService.scala
@@ -24,7 +24,7 @@ import scala.collection.JavaConverters._
import scala.language.implicitConversions
import org.apache.hadoop.conf.Configuration
-import org.apache.hive.service.rpc.thrift.{TCancelDelegationTokenReq,
TCancelDelegationTokenResp, TCancelOperationReq, TCancelOperationResp,
TCLIService, TCloseOperationReq, TCloseOperationResp, TCloseSessionReq,
TCloseSessionResp, TExecuteStatementReq, TExecuteStatementResp,
TFetchResultsReq, TFetchResultsResp, TGetCatalogsReq, TGetCatalogsResp,
TGetColumnsReq, TGetColumnsResp, TGetCrossReferenceReq, TGetCrossReferenceResp,
TGetDelegationTokenReq, TGetDelegationTokenResp, TGetFunctionsR [...]
+import org.apache.hive.service.rpc.thrift._
import org.apache.thrift.protocol.TProtocol
import org.apache.thrift.server.{ServerContext, TServerEventHandler}
import org.apache.thrift.transport.TTransport
@@ -56,22 +56,53 @@ abstract class TFrontendService(name: String)
protected lazy val authFactory: KyuubiAuthenticationFactory =
new KyuubiAuthenticationFactory(conf, isServer())
+ /**
+ * Start the service itself(FE) and its composited (Discovery service, DS)
in the order of:
+ * Start FE ->
+ * if (success) -> Continue starting DS
+ * if (success) -> finish
+ * else -> Stop DS -> Raise Error -> Stop FE -> Raise Error
+ * else
+ * Raise Error -> Stop FE -> Raise Error
+ * This makes sure that the FE has started and ready to serve before
exposing through DS.
+ */
override def start(): Unit = synchronized {
- super.start()
- if (!started.getAndSet(true)) {
- serverThread.start()
+ try {
+ if (started.compareAndSet(false, true)) {
+ serverThread.start()
+ }
+ super.start()
+ } catch {
+ case e: Throwable =>
+ stopInternal()
+ throw e
}
}
protected def stopServer(): Unit
- override def stop(): Unit = synchronized {
- if (started.getAndSet(false)) {
+ /**
+ * Inner stop progress that will not stop all services composited with this.
+ */
+ private def stopInternal(): Unit = {
+ if (started.compareAndSet(true, false)) {
serverThread.interrupt()
stopServer()
info(getName + " has stopped")
}
+ }
+
+ /**
+ * Stop the service itself(FE) and its composited (Discovery service, DS) in
the order of:
+ * Stop DS -> Stop FE
+ * This makes sure of
+ * 1. The service stop serving before terminating during stopping
+ * 2. For engines with group share level, the DS stopping is invoked by a
pool in FE,
+ * so we need to stop DS first in case of interrupting.
+ */
+ override def stop(): Unit = synchronized {
super.stop()
+ stopInternal()
}
override def connectionUrl: String = {
diff --git
a/kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsSystem.scala
b/kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsSystem.scala
index 3622ff8..9533a63 100644
---
a/kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsSystem.scala
+++
b/kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsSystem.scala
@@ -98,7 +98,7 @@ class MetricsSystem extends CompositeService("MetricsSystem")
{
object MetricsSystem {
- private var maybeSystem: Option[MetricsSystem] = None
+ @volatile private var maybeSystem: Option[MetricsSystem] = None
def tracing[T](func: MetricsSystem => T): Unit = {
maybeSystem.foreach(func(_))