This is an automated email from the ASF dual-hosted git repository.
ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new c210fdae6 [KYUUBI #2686] Fix lock bug if engine initialization timeout
c210fdae6 is described below
commit c210fdae69f2c0a8178c75e732426541f6556767
Author: ulysses-you <[email protected]>
AuthorDate: Thu May 19 16:27:34 2022 +0800
[KYUUBI #2686] Fix lock bug if engine initialization timeout
### _Why are the changes needed?_
closes #2686
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run
test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #2687 from ulysses-you/lock.
Closes #2686
e5dcacdb [ulysses-you] finally
868f95b3 [ulysses-you] address comment
1ba65381 [ulysses-you] flaky test
11fad96c [ulysses-you] Fix lock bug if engine initialization timeout
Authored-by: ulysses-you <[email protected]>
Signed-off-by: ulysses-you <[email protected]>
---
.../apache/kyuubi/ha/client/DiscoveryClient.scala | 9 +-
.../zookeeper/ZookeeperDiscoveryClient.scala | 34 ++++++--
.../org/apache/kyuubi/metrics/MetricsSystem.scala | 4 +
.../scala/org/apache/kyuubi/engine/EngineRef.scala | 10 ++-
.../org/apache/kyuubi/engine/EngineRefSuite.scala | 95 ++++++++++++++++++++++
5 files changed, 138 insertions(+), 14 deletions(-)
diff --git
a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/DiscoveryClient.scala
b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/DiscoveryClient.scala
index 1a390d418..c20799a1c 100644
--- a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/DiscoveryClient.scala
+++ b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/DiscoveryClient.scala
@@ -17,8 +17,6 @@
package org.apache.kyuubi.ha.client
-import java.util.concurrent.TimeUnit
-
import org.apache.kyuubi.Logging
import org.apache.kyuubi.config.KyuubiConf
@@ -79,11 +77,10 @@ trait DiscoveryClient extends Logging {
/**
* The distributed lock path used to ensure only once engine being created
for non-CONNECTION
* share level.
+ * @param timeout the timeout of acquiring lock, unit is ms
+ * @throws KyuubiSQLException if timeout or get any exception during
acquiring lock
*/
- def tryWithLock[T](
- lockPath: String,
- timeout: Long,
- unit: TimeUnit = TimeUnit.MILLISECONDS)(f: => T): T
+ def tryWithLock[T](lockPath: String, timeout: Long)(f: => T): T
/**
* Get the engine address and port from engine space.
diff --git
a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClient.scala
b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClient.scala
index cb1c52945..d1ce32685 100644
---
a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClient.scala
+++
b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClient.scala
@@ -134,18 +134,40 @@ class ZookeeperDiscoveryClient(conf: KyuubiConf) extends
DiscoveryClient {
})
}
- def tryWithLock[T](
- lockPath: String,
- timeout: Long,
- unit: TimeUnit = TimeUnit.MILLISECONDS)(f: => T): T = {
+ def tryWithLock[T](lockPath: String, timeout: Long)(f: => T): T = {
var lock: InterProcessSemaphoreMutex = null
try {
try {
lock = new InterProcessSemaphoreMutex(zkClient, lockPath)
// Acquire a lease. If no leases are available, this method blocks
until either the
- // maximum number of leases is increased or another client/process
closes a lease
- lock.acquire(timeout, unit)
+ // maximum number of leases is increased or another client/process
closes a lease.
+ //
+ // Here, we should throw exception if timeout during acquiring lock.
+ // Let's say we have three clients with same request lock to two
kyuubi server instances.
+ //
+ // client A ---> kyuubi X -- first acquired \
+ // client B ---> kyuubi X -- second acquired -- zookeeper
+ // client C ---> kyuubi Y -- third acquired /
+ //
+ // The first client A acqiured the lock then B and C are blocked until
A release the lock,
+ // with the A created engine state:
+ // - SUCCESS
+ // B acquired the lock then get engine ref and release the lock.
+ // C acquired the lock then get engine ref and release the lock.
+ // - FAILED or TIMEOUT
+ // B acquired the lock then try to create engine again if not
timeout.
+ // C should be timeout and throw exception back to client. This
fast fail
+ // to avoid client too long to waiting in concurrent.
+
+ // Return false means we are timeout
+ val acquired = lock.acquire(timeout, TimeUnit.MILLISECONDS)
+ if (!acquired) {
+ throw KyuubiSQLException(s"Timeout to lock on path [$lockPath] after
" +
+ s"$timeout ms. There would be some problem that other session may
" +
+ s"create engine timeout.")
+ }
} catch {
+ case e: KyuubiSQLException => throw e
case e: Exception => throw KyuubiSQLException(s"Lock failed on path
[$lockPath]", e)
}
f
diff --git
a/kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsSystem.scala
b/kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsSystem.scala
index 9533a633d..41ef03710 100644
---
a/kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsSystem.scala
+++
b/kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsSystem.scala
@@ -113,4 +113,8 @@ object MetricsSystem {
tracing(_.updateTimer(name, System.nanoTime() - startTime,
TimeUnit.NANOSECONDS))
}
}
+
+ def counterValue(name: String): Option[Long] = {
+ maybeSystem.map(_.registry.counter(name).getCount)
+ }
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
index 2e4958583..cfeb0aaf3 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
@@ -71,6 +71,10 @@ private[kyuubi] class EngineRef(
private val clientPoolName: String = conf.get(ENGINE_POOL_NAME)
+ // In case the multi kyuubi instances have the small gap of timeout, here we
add
+ // a small amount of time for timeout
+ private val LOCK_TIMEOUT_SPAN_FACTOR = 0.1
+
private var builder: ProcBuilder = _
@VisibleForTesting
@@ -147,10 +151,12 @@ private[kyuubi] class EngineRef(
case _ =>
val lockPath =
DiscoveryPaths.makePath(
- s"${serverSpace}_$shareLevel",
+ s"${serverSpace}_${shareLevel}_$engineType",
"lock",
Array(appUser, subdomain))
- discoveryClient.tryWithLock(lockPath, timeout,
TimeUnit.MILLISECONDS)(f)
+ discoveryClient.tryWithLock(
+ lockPath,
+ timeout + (LOCK_TIMEOUT_SPAN_FACTOR * timeout).toLong)(f)
}
private def create(
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefSuite.scala
index 548e4f13d..9591c48b6 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefSuite.scala
@@ -18,6 +18,7 @@
package org.apache.kyuubi.engine
import java.util.UUID
+import java.util.concurrent.Executors
import org.apache.hadoop.security.UserGroupInformation
import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
@@ -28,6 +29,8 @@ import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.ha.HighAvailabilityConf
import org.apache.kyuubi.ha.client.DiscoveryClientProvider
import org.apache.kyuubi.ha.client.DiscoveryPaths
+import org.apache.kyuubi.metrics.MetricsConstants.ENGINE_TOTAL
+import org.apache.kyuubi.metrics.MetricsSystem
import org.apache.kyuubi.util.NamedThreadFactory
import org.apache.kyuubi.zookeeper.{EmbeddedZookeeper, ZookeeperConf}
@@ -37,6 +40,7 @@ class EngineRefSuite extends KyuubiFunSuite {
private val zkServer = new EmbeddedZookeeper
private val conf = KyuubiConf()
private val user = Utils.currentUser
+ private val metricsSystem = new MetricsSystem
override def beforeAll(): Unit = {
val zkData = Utils.createTempDir()
@@ -45,10 +49,13 @@ class EngineRefSuite extends KyuubiFunSuite {
.set("spark.sql.catalogImplementation", "in-memory")
zkServer.initialize(conf)
zkServer.start()
+ metricsSystem.initialize(conf)
+ metricsSystem.start()
super.beforeAll()
}
override def afterAll(): Unit = {
+ metricsSystem.stop()
zkServer.stop()
super.afterAll()
}
@@ -56,6 +63,8 @@ class EngineRefSuite extends KyuubiFunSuite {
override def beforeEach(): Unit = {
conf.unset(KyuubiConf.ENGINE_SHARE_LEVEL_SUBDOMAIN)
conf.unset(KyuubiConf.ENGINE_SHARE_LEVEL_SUB_DOMAIN)
+ conf.unset(KyuubiConf.ENGINE_POOL_SIZE)
+ conf.unset(KyuubiConf.ENGINE_POOL_NAME)
super.beforeEach()
}
@@ -247,4 +256,90 @@ class EngineRefSuite extends KyuubiFunSuite {
assert(port2 == port1, "engine shared")
}
}
+
+ test("different engine type should use its own lock") {
+ conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, USER.toString)
+ conf.set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
+ conf.set(KyuubiConf.ENGINE_INIT_TIMEOUT, 3000L)
+ conf.set(HighAvailabilityConf.HA_ZK_NAMESPACE, "engine_test1")
+ conf.set(HighAvailabilityConf.HA_ZK_QUORUM, zkServer.getConnectString)
+ val conf1 = conf.clone
+ conf1.set(KyuubiConf.ENGINE_TYPE, SPARK_SQL.toString)
+ val conf2 = conf.clone
+ conf2.set(KyuubiConf.ENGINE_TYPE, HIVE_SQL.toString)
+
+ val start = System.currentTimeMillis()
+ val times = new Array[Long](2)
+ val executor = Executors.newFixedThreadPool(2)
+ try {
+ executor.execute(() => {
+ DiscoveryClientProvider.withDiscoveryClient(conf1) { client =>
+ try {
+ new EngineRef(conf1, user, UUID.randomUUID().toString, null)
+ .getOrCreate(client)
+ } finally {
+ times(0) = System.currentTimeMillis()
+ }
+ }
+ })
+ executor.execute(() => {
+ DiscoveryClientProvider.withDiscoveryClient(conf2) { client =>
+ try {
+ new EngineRef(conf2, user, UUID.randomUUID().toString, null)
+ .getOrCreate(client)
+ } finally {
+ times(1) = System.currentTimeMillis()
+ }
+ }
+ })
+
+ eventually(timeout(10.seconds), interval(200.milliseconds)) {
+ assert(times.forall(_ > start))
+ // ENGINE_INIT_TIMEOUT is 3000ms
+ assert(times.max - times.min < 2500)
+ }
+ } finally {
+ executor.shutdown()
+ }
+ }
+
+ test("three same lock request with initialization timeout") {
+ val id = UUID.randomUUID().toString
+ conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, USER.toString)
+ conf.set(KyuubiConf.ENGINE_TYPE, SPARK_SQL.toString)
+ conf.set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
+ conf.set(KyuubiConf.ENGINE_INIT_TIMEOUT, 3000L)
+ conf.set(HighAvailabilityConf.HA_ZK_NAMESPACE, "engine_test2")
+ conf.set(HighAvailabilityConf.HA_ZK_QUORUM, zkServer.getConnectString)
+
+ val beforeEngines = MetricsSystem.counterValue(ENGINE_TOTAL).getOrElse(0L)
+ val start = System.currentTimeMillis()
+ val times = new Array[Long](3)
+ val executor = Executors.newFixedThreadPool(3)
+ try {
+ (0 until (3)).foreach { i =>
+ val cloned = conf.clone
+ executor.execute(() => {
+ DiscoveryClientProvider.withDiscoveryClient(cloned) { client =>
+ try {
+ new EngineRef(cloned, user, id, null).getOrCreate(client)
+ } finally {
+ times(i) = System.currentTimeMillis()
+ }
+ }
+ })
+ }
+
+ eventually(timeout(20.seconds), interval(200.milliseconds)) {
+ assert(times.forall(_ > start))
+ // ENGINE_INIT_TIMEOUT is 3000ms
+ assert(times.max - times.min > 2800)
+ }
+
+ // we should only submit two engines, the last request should timeout
and fail
+ assert(MetricsSystem.counterValue(ENGINE_TOTAL).get - beforeEngines == 2)
+ } finally {
+ executor.shutdown()
+ }
+ }
}