This is an automated email from the ASF dual-hosted git repository.

ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 67d1c5d  [KYUUBI #1969] Fix race on some service during start and stop 
phase
67d1c5d is described below

commit 67d1c5dd513327b350dac3d6855af5223a594087
Author: Kent Yao <[email protected]>
AuthorDate: Tue Mar 1 10:39:41 2022 +0800

    [KYUUBI #1969] Fix race on some service during start and stop phase
    
    <!--
    Thanks for sending a pull request!
    
    Here are some tips for you:
      1. If this is your first time, please read our contributor guidelines: 
https://kyuubi.readthedocs.io/en/latest/community/contributions.html
      2. If the PR is related to an issue in 
https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your 
PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
      3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., 
'[WIP][KYUUBI #XXXX] Your PR title ...'.
    -->
    
    ### _Why are the changes needed?_
    <!--
    Please clarify why the changes are needed. For instance,
      1. If you add a feature, you can talk about the use case of it.
      2. If you fix a bug, you can clarify why it is a bug.
    -->
    
    #1969 might be fixed by this.
    
    ```
    12:18:19.885 SparkTBinaryFrontendHandler-Pool: Thread-1260 INFO 
SparkSQLSessionManager: Session stopped due to shared level is Connection.
    12:18:19.886 SparkTBinaryFrontendHandler-Pool: Thread-1260 INFO 
SparkSQLEngine: Service: [SparkTBinaryFrontend] is stopping.
    12:18:19.886 SparkTBinaryFrontendHandler-Pool: Thread-1260 INFO 
SparkTBinaryFrontendService: SparkTBinaryFrontend has stopped
    12:18:19.886 SparkTBinaryFrontendHandler-Pool: Thread-1260 INFO 
SparkTBinaryFrontendService: Service: [EngineServiceDiscovery] is stopping.
    12:18:19.890 SparkTBinaryFrontendHandler-Pool: Thread-1260 DEBUG 
FailedDeleteManager: Path being added to guaranteed delete set: 
/kyuubi/CONNECTION/933147f8-1fcb-4cda-875f-0b219f97aaf3/serviceUri=localhost:43147;version=1.5.0-SNAPSHOT;sequence=0000000000
    12:18:19.892 ProcessThread(sid:0 cport:40861): INFO PrepRequestProcessor: 
Got user-level KeeperException when processing sessionid:0x100000e6ac00000 
type:delete cxid:0xb zxid:0x9 txntype:-1 reqpath:n/a Error 
Path:/kyuubi/CONNECTION/933147f8-1fcb-4cda-875f-0b219f97aaf3/serviceUri=localhost:43147;version=1.5.0-SNAPSHOT;sequence=0000000000
 Error:KeeperErrorCode = NoNode for 
/kyuubi/CONNECTION/933147f8-1fcb-4cda-875f-0b219f97aaf3/serviceUri=localhost:43147;version=1.5.0-SNAPSHOT;sequence=
 [...]
    12:18:19.892 SyncThread:0 DEBUG FinalRequestProcessor: Processing request:: 
sessionid:0x100000e6ac00000 type:delete cxid:0xa zxid:0x8 txntype:2 reqpath:n/a
    12:18:19.893 SparkTBinaryFrontendHandler-Pool: Thread-1260 ERROR 
ServiceDiscoveryClient: Failed to close the persistent ephemeral znodenull
    java.io.IOException: java.lang.InterruptedException
        at 
org.apache.curator.framework.recipes.nodes.PersistentNode.close(PersistentNode.java:296)
 ~[curator-recipes-2.12.0.jar:?]
        at 
org.apache.kyuubi.ha.client.zookeeper.ServiceDiscoveryClient.deregisterService(ServiceDiscoveryClient.scala:117)
 ~[kyuubi-ha_2.12-1.5.0-SNAPSHOT.jar:1.5.0-SNAPSHOT]
        at 
org.apache.kyuubi.ha.client.EngineServiceDiscovery.stop(EngineServiceDiscovery.scala:35)
 ~[kyuubi-ha_2.12-1.5.0-SNAPSHOT.jar:1.5.0-SNAPSHOT]
        at 
org.apache.kyuubi.service.CompositeService.$anonfun$stop$2(CompositeService.scala:75)
 ~[kyuubi-common_2.12-1.5.0-SNAPSHOT.jar:1.5.0-SNAPSHOT]
    ```
    
    the deregister for connection-level happens in session close and 
SparkTBinaryFrontendHandler pool, which might be stopped before finish 
executing zk node deletion.
    
    this PR contains some other race issues too
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run 
test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests)
 locally before make a pull request
    
    Closes #1980 from yaooqinn/conc.
    
    Closes #1969
    
    efd12707 [Kent Yao] address comments
    aed6636f [Kent Yao] Fix race on some service during stop phase
    63592621 [Kent Yao] Fix race on some service during stop phase
    
    Authored-by: Kent Yao <[email protected]>
    Signed-off-by: ulysses-you <[email protected]>
---
 .../kyuubi/engine/flink/FlinkSQLEngine.scala       |  5 ---
 .../events/AbstractEventLoggingService.scala       |  2 +-
 .../apache/kyuubi/service/TFrontendService.scala   | 43 +++++++++++++++++++---
 .../org/apache/kyuubi/metrics/MetricsSystem.scala  |  2 +-
 4 files changed, 39 insertions(+), 13 deletions(-)

diff --git 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala
 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala
index a08b461..a242df9 100644
--- 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala
+++ 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala
@@ -58,11 +58,6 @@ case class FlinkSQLEngine(engineContext: DefaultContext) 
extends Serverable("Fli
       currentEngine.get.stop()
     }
   }
-
-  override def stop(): Unit = {
-    super.stop()
-  }
-
 }
 
 object FlinkSQLEngine extends Logging {
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/events/AbstractEventLoggingService.scala
 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/events/AbstractEventLoggingService.scala
index 248d3c7..51de9c8 100644
--- 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/events/AbstractEventLoggingService.scala
+++ 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/events/AbstractEventLoggingService.scala
@@ -51,7 +51,7 @@ abstract class AbstractEventLoggingService
 
 object EventLogging {
 
-  private[events] var _service: Option[AbstractEventLoggingService] = None
+  @volatile private[events] var _service: Option[AbstractEventLoggingService] 
= None
 
   def onEvent(event: KyuubiEvent): Unit = {
     _service.foreach(_.onEvent(event))
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/TFrontendService.scala 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/TFrontendService.scala
index 957ff2b..464901d 100644
--- 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/TFrontendService.scala
+++ 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/TFrontendService.scala
@@ -24,7 +24,7 @@ import scala.collection.JavaConverters._
 import scala.language.implicitConversions
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.hive.service.rpc.thrift.{TCancelDelegationTokenReq, 
TCancelDelegationTokenResp, TCancelOperationReq, TCancelOperationResp, 
TCLIService, TCloseOperationReq, TCloseOperationResp, TCloseSessionReq, 
TCloseSessionResp, TExecuteStatementReq, TExecuteStatementResp, 
TFetchResultsReq, TFetchResultsResp, TGetCatalogsReq, TGetCatalogsResp, 
TGetColumnsReq, TGetColumnsResp, TGetCrossReferenceReq, TGetCrossReferenceResp, 
TGetDelegationTokenReq, TGetDelegationTokenResp, TGetFunctionsR [...]
+import org.apache.hive.service.rpc.thrift._
 import org.apache.thrift.protocol.TProtocol
 import org.apache.thrift.server.{ServerContext, TServerEventHandler}
 import org.apache.thrift.transport.TTransport
@@ -56,22 +56,53 @@ abstract class TFrontendService(name: String)
   protected lazy val authFactory: KyuubiAuthenticationFactory =
     new KyuubiAuthenticationFactory(conf, isServer())
 
+  /**
+   * Start the service itself(FE) and its composited (Discovery service, DS) 
in the order of:
+   *   Start FE ->
+   *     if (success) -> Continue starting DS
+   *       if (success) -> finish
+   *       else -> Stop DS -> Raise Error -> Stop FE -> Raise Error
+   *     else
+   *       Raise Error -> Stop FE -> Raise Error
+   *    This makes sure that the FE has started and ready to serve before 
exposing through DS.
+   */
   override def start(): Unit = synchronized {
-    super.start()
-    if (!started.getAndSet(true)) {
-      serverThread.start()
+    try {
+      if (started.compareAndSet(false, true)) {
+        serverThread.start()
+      }
+      super.start()
+    } catch {
+      case e: Throwable =>
+        stopInternal()
+        throw e
     }
   }
 
   protected def stopServer(): Unit
 
-  override def stop(): Unit = synchronized {
-    if (started.getAndSet(false)) {
+  /**
+   * Inner stop progress that will not stop all services composited with this.
+   */
+  private def stopInternal(): Unit = {
+    if (started.compareAndSet(true, false)) {
       serverThread.interrupt()
       stopServer()
       info(getName + " has stopped")
     }
+  }
+
+  /**
+   * Stop the service itself(FE) and its composited (Discovery service, DS) in 
the order of:
+   *   Stop DS -> Stop FE
+   * This makes sure of
+   *   1. The service stop serving before terminating during stopping
+   *   2. For engines with group share level, the DS stopping is invoked by a 
pool in FE,
+   *   so we need to stop DS first in case of interrupting.
+   */
+  override def stop(): Unit = synchronized {
     super.stop()
+    stopInternal()
   }
 
   override def connectionUrl: String = {
diff --git 
a/kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsSystem.scala 
b/kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsSystem.scala
index 3622ff8..9533a63 100644
--- 
a/kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsSystem.scala
+++ 
b/kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsSystem.scala
@@ -98,7 +98,7 @@ class MetricsSystem extends CompositeService("MetricsSystem") 
{
 
 object MetricsSystem {
 
-  private var maybeSystem: Option[MetricsSystem] = None
+  @volatile private var maybeSystem: Option[MetricsSystem] = None
 
   def tracing[T](func: MetricsSystem => T): Unit = {
     maybeSystem.foreach(func(_))

Reply via email to