This is an automated email from the ASF dual-hosted git repository.

ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new c210fdae6 [KYUUBI #2686] Fix lock bug if engine initialization timeout
c210fdae6 is described below

commit c210fdae69f2c0a8178c75e732426541f6556767
Author: ulysses-you <[email protected]>
AuthorDate: Thu May 19 16:27:34 2022 +0800

    [KYUUBI #2686] Fix lock bug if engine initialization timeout
    
    ### _Why are the changes needed?_
    
    closes #2686
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run 
test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests)
 locally before make a pull request
    
    Closes #2687 from ulysses-you/lock.
    
    Closes #2686
    
    e5dcacdb [ulysses-you] finally
    868f95b3 [ulysses-you] address comment
    1ba65381 [ulysses-you] flaky test
    11fad96c [ulysses-you] Fix lock bug if engine initialization timeout
    
    Authored-by: ulysses-you <[email protected]>
    Signed-off-by: ulysses-you <[email protected]>
---
 .../apache/kyuubi/ha/client/DiscoveryClient.scala  |  9 +-
 .../zookeeper/ZookeeperDiscoveryClient.scala       | 34 ++++++--
 .../org/apache/kyuubi/metrics/MetricsSystem.scala  |  4 +
 .../scala/org/apache/kyuubi/engine/EngineRef.scala | 10 ++-
 .../org/apache/kyuubi/engine/EngineRefSuite.scala  | 95 ++++++++++++++++++++++
 5 files changed, 138 insertions(+), 14 deletions(-)

diff --git 
a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/DiscoveryClient.scala 
b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/DiscoveryClient.scala
index 1a390d418..c20799a1c 100644
--- a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/DiscoveryClient.scala
+++ b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/DiscoveryClient.scala
@@ -17,8 +17,6 @@
 
 package org.apache.kyuubi.ha.client
 
-import java.util.concurrent.TimeUnit
-
 import org.apache.kyuubi.Logging
 import org.apache.kyuubi.config.KyuubiConf
 
@@ -79,11 +77,10 @@ trait DiscoveryClient extends Logging {
   /**
    * The distributed lock path used to ensure only once engine being created 
for non-CONNECTION
    * share level.
+   * @param timeout the timeout of acquiring lock, unit is ms
+   * @throws KyuubiSQLException if timeout or get any exception during 
acquiring lock
    */
-  def tryWithLock[T](
-      lockPath: String,
-      timeout: Long,
-      unit: TimeUnit = TimeUnit.MILLISECONDS)(f: => T): T
+  def tryWithLock[T](lockPath: String, timeout: Long)(f: => T): T
 
   /**
    * Get the engine address and port from engine space.
diff --git 
a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClient.scala
 
b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClient.scala
index cb1c52945..d1ce32685 100644
--- 
a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClient.scala
+++ 
b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClient.scala
@@ -134,18 +134,40 @@ class ZookeeperDiscoveryClient(conf: KyuubiConf) extends 
DiscoveryClient {
       })
   }
 
-  def tryWithLock[T](
-      lockPath: String,
-      timeout: Long,
-      unit: TimeUnit = TimeUnit.MILLISECONDS)(f: => T): T = {
+  def tryWithLock[T](lockPath: String, timeout: Long)(f: => T): T = {
     var lock: InterProcessSemaphoreMutex = null
     try {
       try {
         lock = new InterProcessSemaphoreMutex(zkClient, lockPath)
         // Acquire a lease. If no leases are available, this method blocks 
until either the
-        // maximum number of leases is increased or another client/process 
closes a lease
-        lock.acquire(timeout, unit)
+        // maximum number of leases is increased or another client/process 
closes a lease.
+        //
+        // Here, we should throw exception if timeout during acquiring lock.
+        // Let's say we have three clients with same request lock to two 
kyuubi server instances.
+        //
+        //  client A  --->  kyuubi X  -- first acquired  \
+        //  client B  --->  kyuubi X  -- second acquired --  zookeeper
+        //  client C  --->  kyuubi Y  -- third acquired  /
+        //
+        // The first client A acqiured the lock then B and C are blocked until 
A release the lock,
+        // with the A created engine state:
+        //   - SUCCESS
+        //     B acquired the lock then get engine ref and release the lock.
+        //     C acquired the lock then get engine ref and release the lock.
+        //   - FAILED or TIMEOUT
+        //     B acquired the lock then try to create engine again if not 
timeout.
+        //     C should be timeout and throw exception back to client. This 
fast fail
+        //     to avoid client too long to waiting in concurrent.
+
+        // Return false means we are timeout
+        val acquired = lock.acquire(timeout, TimeUnit.MILLISECONDS)
+        if (!acquired) {
+          throw KyuubiSQLException(s"Timeout to lock on path [$lockPath] after 
" +
+            s"$timeout ms. There would be some problem that other session may 
" +
+            s"create engine timeout.")
+        }
       } catch {
+        case e: KyuubiSQLException => throw e
         case e: Exception => throw KyuubiSQLException(s"Lock failed on path 
[$lockPath]", e)
       }
       f
diff --git 
a/kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsSystem.scala 
b/kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsSystem.scala
index 9533a633d..41ef03710 100644
--- 
a/kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsSystem.scala
+++ 
b/kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsSystem.scala
@@ -113,4 +113,8 @@ object MetricsSystem {
       tracing(_.updateTimer(name, System.nanoTime() - startTime, 
TimeUnit.NANOSECONDS))
     }
   }
+
+  def counterValue(name: String): Option[Long] = {
+    maybeSystem.map(_.registry.counter(name).getCount)
+  }
 }
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
index 2e4958583..cfeb0aaf3 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
@@ -71,6 +71,10 @@ private[kyuubi] class EngineRef(
 
   private val clientPoolName: String = conf.get(ENGINE_POOL_NAME)
 
+  // In case the multi kyuubi instances have the small gap of timeout, here we 
add
+  // a small amount of time for timeout
+  private val LOCK_TIMEOUT_SPAN_FACTOR = 0.1
+
   private var builder: ProcBuilder = _
 
   @VisibleForTesting
@@ -147,10 +151,12 @@ private[kyuubi] class EngineRef(
       case _ =>
         val lockPath =
           DiscoveryPaths.makePath(
-            s"${serverSpace}_$shareLevel",
+            s"${serverSpace}_${shareLevel}_$engineType",
             "lock",
             Array(appUser, subdomain))
-        discoveryClient.tryWithLock(lockPath, timeout, 
TimeUnit.MILLISECONDS)(f)
+        discoveryClient.tryWithLock(
+          lockPath,
+          timeout + (LOCK_TIMEOUT_SPAN_FACTOR * timeout).toLong)(f)
     }
 
   private def create(
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefSuite.scala 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefSuite.scala
index 548e4f13d..9591c48b6 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefSuite.scala
@@ -18,6 +18,7 @@
 package org.apache.kyuubi.engine
 
 import java.util.UUID
+import java.util.concurrent.Executors
 
 import org.apache.hadoop.security.UserGroupInformation
 import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
@@ -28,6 +29,8 @@ import org.apache.kyuubi.config.KyuubiConf._
 import org.apache.kyuubi.ha.HighAvailabilityConf
 import org.apache.kyuubi.ha.client.DiscoveryClientProvider
 import org.apache.kyuubi.ha.client.DiscoveryPaths
+import org.apache.kyuubi.metrics.MetricsConstants.ENGINE_TOTAL
+import org.apache.kyuubi.metrics.MetricsSystem
 import org.apache.kyuubi.util.NamedThreadFactory
 import org.apache.kyuubi.zookeeper.{EmbeddedZookeeper, ZookeeperConf}
 
@@ -37,6 +40,7 @@ class EngineRefSuite extends KyuubiFunSuite {
   private val zkServer = new EmbeddedZookeeper
   private val conf = KyuubiConf()
   private val user = Utils.currentUser
+  private val metricsSystem = new MetricsSystem
 
   override def beforeAll(): Unit = {
     val zkData = Utils.createTempDir()
@@ -45,10 +49,13 @@ class EngineRefSuite extends KyuubiFunSuite {
       .set("spark.sql.catalogImplementation", "in-memory")
     zkServer.initialize(conf)
     zkServer.start()
+    metricsSystem.initialize(conf)
+    metricsSystem.start()
     super.beforeAll()
   }
 
   override def afterAll(): Unit = {
+    metricsSystem.stop()
     zkServer.stop()
     super.afterAll()
   }
@@ -56,6 +63,8 @@ class EngineRefSuite extends KyuubiFunSuite {
   override def beforeEach(): Unit = {
     conf.unset(KyuubiConf.ENGINE_SHARE_LEVEL_SUBDOMAIN)
     conf.unset(KyuubiConf.ENGINE_SHARE_LEVEL_SUB_DOMAIN)
+    conf.unset(KyuubiConf.ENGINE_POOL_SIZE)
+    conf.unset(KyuubiConf.ENGINE_POOL_NAME)
     super.beforeEach()
   }
 
@@ -247,4 +256,90 @@ class EngineRefSuite extends KyuubiFunSuite {
       assert(port2 == port1, "engine shared")
     }
   }
+
+  test("different engine type should use its own lock") {
+    conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, USER.toString)
+    conf.set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
+    conf.set(KyuubiConf.ENGINE_INIT_TIMEOUT, 3000L)
+    conf.set(HighAvailabilityConf.HA_ZK_NAMESPACE, "engine_test1")
+    conf.set(HighAvailabilityConf.HA_ZK_QUORUM, zkServer.getConnectString)
+    val conf1 = conf.clone
+    conf1.set(KyuubiConf.ENGINE_TYPE, SPARK_SQL.toString)
+    val conf2 = conf.clone
+    conf2.set(KyuubiConf.ENGINE_TYPE, HIVE_SQL.toString)
+
+    val start = System.currentTimeMillis()
+    val times = new Array[Long](2)
+    val executor = Executors.newFixedThreadPool(2)
+    try {
+      executor.execute(() => {
+        DiscoveryClientProvider.withDiscoveryClient(conf1) { client =>
+          try {
+            new EngineRef(conf1, user, UUID.randomUUID().toString, null)
+              .getOrCreate(client)
+          } finally {
+            times(0) = System.currentTimeMillis()
+          }
+        }
+      })
+      executor.execute(() => {
+        DiscoveryClientProvider.withDiscoveryClient(conf2) { client =>
+          try {
+            new EngineRef(conf2, user, UUID.randomUUID().toString, null)
+              .getOrCreate(client)
+          } finally {
+            times(1) = System.currentTimeMillis()
+          }
+        }
+      })
+
+      eventually(timeout(10.seconds), interval(200.milliseconds)) {
+        assert(times.forall(_ > start))
+        // ENGINE_INIT_TIMEOUT is 3000ms
+        assert(times.max - times.min < 2500)
+      }
+    } finally {
+      executor.shutdown()
+    }
+  }
+
+  test("three same lock request with initialization timeout") {
+    val id = UUID.randomUUID().toString
+    conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, USER.toString)
+    conf.set(KyuubiConf.ENGINE_TYPE, SPARK_SQL.toString)
+    conf.set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
+    conf.set(KyuubiConf.ENGINE_INIT_TIMEOUT, 3000L)
+    conf.set(HighAvailabilityConf.HA_ZK_NAMESPACE, "engine_test2")
+    conf.set(HighAvailabilityConf.HA_ZK_QUORUM, zkServer.getConnectString)
+
+    val beforeEngines = MetricsSystem.counterValue(ENGINE_TOTAL).getOrElse(0L)
+    val start = System.currentTimeMillis()
+    val times = new Array[Long](3)
+    val executor = Executors.newFixedThreadPool(3)
+    try {
+      (0 until (3)).foreach { i =>
+        val cloned = conf.clone
+        executor.execute(() => {
+          DiscoveryClientProvider.withDiscoveryClient(cloned) { client =>
+            try {
+              new EngineRef(cloned, user, id, null).getOrCreate(client)
+            } finally {
+              times(i) = System.currentTimeMillis()
+            }
+          }
+        })
+      }
+
+      eventually(timeout(20.seconds), interval(200.milliseconds)) {
+        assert(times.forall(_ > start))
+        // ENGINE_INIT_TIMEOUT is 3000ms
+        assert(times.max - times.min > 2800)
+      }
+
+      // we should only submit two engines, the last request should timeout 
and fail
+      assert(MetricsSystem.counterValue(ENGINE_TOTAL).get - beforeEngines == 2)
+    } finally {
+      executor.shutdown()
+    }
+  }
 }

Reply via email to