This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 393d32a30 [KYUUBI #3615] Retry opening the engine when encountering a 
special error
393d32a30 is described below

commit 393d32a30c8feeb16700299005fbfeb172ffaa85
Author: Fu Chen <[email protected]>
AuthorDate: Thu Oct 20 14:31:35 2022 +0800

    [KYUUBI #3615] Retry opening the engine when encountering a special error
    
    ### _Why are the changes needed?_
    
    to close #3615
    
    steps to reproduce
    
    1. start kyuubi server
    2. connect to the kyuubi server with client beeline and run query `select 1`
    3. force kill the spark application
    4. connect to the kyuubi server with another client **A** immediately
    
    ```
    Error: org.apache.kyuubi.KyuubiSQLException: Error operating LaunchEngine: 
org.apache.thrift.transport.TTransportException: java.net.ConnectException: 
Connection refused (Connection refused)
            at org.apache.thrift.transport.TSocket.open(TSocket.java:226)
            at 
org.apache.thrift.transport.TSaslTransport.open(TSaslTransport.java:266)
            at 
org.apache.thrift.transport.TSaslClientTransport.open(TSaslClientTransport.java:37)
            at 
org.apache.kyuubi.client.KyuubiSyncThriftClient$.createTProtocol(KyuubiSyncThriftClient.scala:466)
            at 
org.apache.kyuubi.client.KyuubiSyncThriftClient$.createClient(KyuubiSyncThriftClient.scala:482)
            at 
org.apache.kyuubi.session.KyuubiSessionImpl.$anonfun$openEngineSession$1(KyuubiSessionImpl.scala:128)
            at 
org.apache.kyuubi.session.KyuubiSessionImpl.$anonfun$openEngineSession$1$adapted(KyuubiSessionImpl.scala:113)
            at 
org.apache.kyuubi.ha.client.DiscoveryClientProvider$.withDiscoveryClient(DiscoveryClientProvider.scala:36)
            at 
org.apache.kyuubi.session.KyuubiSessionImpl.openEngineSession(KyuubiSessionImpl.scala:113)
            at 
org.apache.kyuubi.operation.LaunchEngine.$anonfun$runInternal$2(LaunchEngine.scala:60)
            at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
            at java.util.concurrent.FutureTask.run(FutureTask.java:266)
            at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
            at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
            at java.lang.Thread.run(Thread.java:748)
    Caused by: java.net.ConnectException: Connection refused (Connection 
refused)
            at java.net.PlainSocketImpl.socketConnect(Native Method)
            at 
java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
            at 
java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
            at 
java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
            at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
            at java.net.Socket.connect(Socket.java:589)
            at org.aache.thrift.transport.TSocket.open(TSocket.java:221)
            ... 14 more (state=,code=0)
    ```
    
    If the engine exits without calling the shutdown hook, the ZNODE with 
EPHEMERAL_SEQUENTIAL mode will be deleted upon the client's disconnect, and 
note that the engine will be marked as disconnected after 
`kyuubi.zookeeper.embedded.max.session.timeout` (the default value is 1 minutes)
    
    This PR retries opening the engine when encountering a specialty exception 
(java.net.ConnectException). Before this PR, client **A** will throw an 
exception, after this PR, client **A** will work properly.
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run 
test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests)
 locally before make a pull request
    
    Closes #3618 from cfmcgrady/kyuubi-3615.
    
    Closes #3615
    
    96acd66b [Fu Chen] address comment
    a1395a60 [Fu Chen] fix style
    870216d1 [Fu Chen] Retry opening the engine when encountering a special 
error
    
    Authored-by: Fu Chen <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
---
 docs/deployment/settings.md                        |  2 +
 .../org/apache/kyuubi/config/KyuubiConf.scala      | 14 +++++++
 .../apache/kyuubi/session/KyuubiSessionImpl.scala  | 45 +++++++++++++++-------
 .../operation/KyuubiOperationPerUserSuite.scala    | 34 ++++++++++++++++
 4 files changed, 82 insertions(+), 13 deletions(-)

diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md
index 8c019d2b1..de61e18ee 100644
--- a/docs/deployment/settings.md
+++ b/docs/deployment/settings.md
@@ -457,6 +457,8 @@ kyuubi.session.engine.initialize.timeout|PT3M|Timeout for 
starting the backgroun
 kyuubi.session.engine.launch.async|true|When opening kyuubi session, whether 
to launch backend engine asynchronously. When true, the Kyuubi server will set 
up the connection with the client without delay as the backend engine will be 
created asynchronously.|boolean|1.4.0
 kyuubi.session.engine.log.timeout|PT24H|If we use Spark as the engine then the 
session submit log is the console output of spark-submit. We will retain the 
session submit log until over the config value.|duration|1.1.0
 kyuubi.session.engine.login.timeout|PT15S|The timeout of creating the 
connection to remote sql query engine|duration|1.0.0
+kyuubi.session.engine.open.max.attempts|9|The number of times an open engine 
will retry when encountering a special error.|int|1.7.0
+kyuubi.session.engine.open.retry.wait|PT10S|How long to wait before retrying 
to open engine after a failure.|duration|1.7.0
 kyuubi.session.engine.share.level|USER|(deprecated) - Using 
kyuubi.engine.share.level instead|string|1.0.0
 kyuubi.session.engine.spark.main.resource|&lt;undefined&gt;|The package used 
to create Spark SQL engine remote application. If it is undefined, Kyuubi will 
use the default|string|1.0.0
 kyuubi.session.engine.spark.max.lifetime|PT0S|Max lifetime for spark engine, 
the engine will self-terminate when it reaches the end of life. 0 or negative 
means not to self-terminate.|duration|1.6.0
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index 3779cbe84..93ef7d15d 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -955,6 +955,20 @@ object KyuubiConf {
       .timeConf
       .createWithDefault(Duration.ofSeconds(120).toMillis)
 
+  val ENGINE_OPEN_MAX_ATTEMPTS: ConfigEntry[Int] =
+    buildConf("kyuubi.session.engine.open.max.attempts")
+      .doc("The number of times an open engine will retry when encountering a 
special error.")
+      .version("1.7.0")
+      .intConf
+      .createWithDefault(9)
+
+  val ENGINE_OPEN_RETRY_WAIT: ConfigEntry[Long] =
+    buildConf("kyuubi.session.engine.open.retry.wait")
+      .doc("How long to wait before retrying to open engine after a failure.")
+      .version("1.7.0")
+      .timeConf
+      .createWithDefault(Duration.ofSeconds(10).toMillis)
+
   val ENGINE_INIT_TIMEOUT: ConfigEntry[Long] = 
buildConf("kyuubi.session.engine.initialize.timeout")
     .doc("Timeout for starting the background engine, e.g. SparkSQLEngine.")
     .version("1.0.0")
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
index 2f96c20f5..c1e17deb3 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
@@ -117,26 +117,45 @@ class KyuubiSessionImpl(
         openEngineSessionConf =
           optimizedConf ++ Map(KYUUBI_ENGINE_CREDENTIALS_KEY -> 
engineCredentials)
       }
-      val (host, port) = engine.getOrCreate(discoveryClient, extraEngineLog)
       val passwd =
         if (sessionManager.getConf.get(ENGINE_SECURITY_ENABLED)) {
           InternalSecurityAccessor.get().issueToken()
         } else {
           Option(password).filter(_.nonEmpty).getOrElse("anonymous")
         }
-      try {
-        _client = KyuubiSyncThriftClient.createClient(user, passwd, host, 
port, sessionConf)
-        _engineSessionHandle = _client.openSession(protocol, user, passwd, 
openEngineSessionConf)
-      } catch {
-        case e: Throwable =>
-          error(
-            s"Opening engine [${engine.defaultEngineName} $host:$port]" +
-              s" for $user session failed",
-            e)
-          throw e
+
+      val maxAttempts = sessionManager.getConf.get(ENGINE_OPEN_MAX_ATTEMPTS)
+      val retryWait = sessionManager.getConf.get(ENGINE_OPEN_RETRY_WAIT)
+      var attempt = 0
+      var shouldRetry = true
+      while (attempt <= maxAttempts && shouldRetry) {
+        val (host, port) = engine.getOrCreate(discoveryClient, extraEngineLog)
+        try {
+          _client = KyuubiSyncThriftClient.createClient(user, passwd, host, 
port, sessionConf)
+          _engineSessionHandle = _client.openSession(protocol, user, passwd, 
openEngineSessionConf)
+          logSessionInfo(s"Connected to engine 
[$host:$port]/[${client.engineId.getOrElse("")}]" +
+            s" with ${_engineSessionHandle}]")
+          shouldRetry = false
+        } catch {
+          case e: org.apache.thrift.transport.TTransportException
+              if attempt < maxAttempts && 
e.getCause.isInstanceOf[java.net.ConnectException] &&
+                e.getCause.getMessage.contains("Connection refused (Connection 
refused)") =>
+            warn(
+              s"Failed to open [${engine.defaultEngineName} $host:$port] 
after" +
+                s" $attempt/$maxAttempts times, retrying",
+              e.getCause)
+            Thread.sleep(retryWait)
+            shouldRetry = true
+          case e: Throwable =>
+            error(
+              s"Opening engine [${engine.defaultEngineName} $host:$port]" +
+                s" for $user session failed",
+              e)
+            throw e
+        } finally {
+          attempt += 1
+        }
       }
-      logSessionInfo(s"Connected to engine 
[$host:$port]/[${client.engineId.getOrElse("")}]" +
-        s" with ${_engineSessionHandle}]")
       sessionEvent.openedTime = System.currentTimeMillis()
       sessionEvent.remoteSessionId = _engineSessionHandle.identifier.toString
       _client.engineId.foreach(e => sessionEvent.engineId = e)
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerUserSuite.scala
 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerUserSuite.scala
index 4cb3f7ef7..88830d80d 100644
--- 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerUserSuite.scala
+++ 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerUserSuite.scala
@@ -29,6 +29,7 @@ import 
org.apache.kyuubi.config.KyuubiConf.KYUUBI_ENGINE_ENV_PREFIX
 import org.apache.kyuubi.engine.SemanticVersion
 import org.apache.kyuubi.jdbc.hive.KyuubiStatement
 import org.apache.kyuubi.session.{KyuubiSessionImpl, KyuubiSessionManager, 
SessionHandle}
+import org.apache.kyuubi.zookeeper.ZookeeperConf
 
 class KyuubiOperationPerUserSuite
   extends WithKyuubiServer with SparkQueryTests with WithSimpleDFSService {
@@ -280,4 +281,37 @@ class KyuubiOperationPerUserSuite
       }
     }
   }
+
+  test("the new client should work properly when the engine exits 
unexpectedly") {
+    assume(!httpMode)
+    withSessionConf(Map(
+      ZookeeperConf.ZK_MAX_SESSION_TIMEOUT.key -> "10000"))(Map.empty)(
+      Map.empty) {
+      withSessionHandle { (client, handle) =>
+        val preReq = new TExecuteStatementReq()
+        preReq.setStatement("SET kyuubi.operation.language=scala")
+        preReq.setSessionHandle(handle)
+        preReq.setRunAsync(false)
+        client.ExecuteStatement(preReq)
+
+        val exitReq = new TExecuteStatementReq()
+        // force kill engine without shutdown hook
+        exitReq.setStatement("java.lang.Runtime.getRuntime().halt(-1)")
+        exitReq.setSessionHandle(handle)
+        exitReq.setRunAsync(true)
+        client.ExecuteStatement(exitReq)
+      }
+      withSessionHandle { (client, handle) =>
+        val preReq = new TExecuteStatementReq()
+        preReq.setStatement("select engine_name()")
+        preReq.setSessionHandle(handle)
+        preReq.setRunAsync(false)
+        val tExecuteStatementResp = client.ExecuteStatement(preReq)
+        val opHandle = tExecuteStatementResp.getOperationHandle
+        waitForOperationToComplete(client, opHandle)
+        assert(tExecuteStatementResp.getStatus.getStatusCode === 
TStatusCode.SUCCESS_STATUS)
+      }
+    }
+  }
+
 }

Reply via email to