This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new c25961ea0 [KYUUBI #2887] Add a POLLING balance policy for spark engine
pool
c25961ea0 is described below
commit c25961ea0271ea38e67e046fc3696605df2958b8
Author: yuqi <[email protected]>
AuthorDate: Mon Oct 24 14:17:59 2022 +0800
[KYUUBI #2887] Add a POLLING balance policy for spark engine pool
### _Why are the changes needed?_
As described in #2887, random policy may cause task-hot-issues or
production accident, so a POLLING balance policy is added to avoid this
problem, we can "set kyuubi.engine.pool.balance.policy = POLLING" to use it.
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [] Add screenshots for manual tests if appropriate
- [x] [Run
test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #3662 from ychris78/epbp_1018.
Closes #2887
90834c1f [yuqi] fix bug
c0ce8a7e [yuqi] Fix typos
bec4ca13 [yuqi] Fix typos
dc4e3637 [yuqi] please add test case to cover the added methods
837c920b [yuqi] rename method getAndInc to getAndIncrement and modify the
doc
02a8311f [yuqi] add getAndInc method and refer it
9436937d [yuqi] Improve configuration documentation
21b5d7d1 [yuqi] [KYUUBI #2887] Add a polling balance policy for spark
engine pool Improve configuration documentation and delete useless comments
f0699b3c [yuqi] [KYUUBI #2887] Add a polling balance policy for spark
engine pool
Authored-by: yuqi <[email protected]>
Signed-off-by: Fei Wang <[email protected]>
---
docs/deployment/settings.md | 1 +
.../org/apache/kyuubi/config/KyuubiConf.scala | 11 ++++++
.../apache/kyuubi/ha/client/DiscoveryClient.scala | 13 +++++++
.../ha/client/etcd/EtcdDiscoveryClient.scala | 18 +++++++++
.../zookeeper/ZookeeperDiscoveryClient.scala | 15 ++++++++
.../kyuubi/ha/client/DiscoveryClientTests.scala | 20 ++++++++++
.../scala/org/apache/kyuubi/engine/EngineRef.scala | 45 ++++++++++++++--------
.../org/apache/kyuubi/engine/EngineRefTests.scala | 13 +++++++
8 files changed, 120 insertions(+), 16 deletions(-)
diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md
index de61e18ee..caaae3d9d 100644
--- a/docs/deployment/settings.md
+++ b/docs/deployment/settings.md
@@ -243,6 +243,7 @@ kyuubi.engine.jdbc.memory|1g|The heap memory for the jdbc
query engine|string|1.
kyuubi.engine.jdbc.type|<undefined>|The short name of jdbc
type|string|1.6.0
kyuubi.engine.operation.convert.catalog.database.enabled|true|When set to
true, The engine converts the JDBC methods of set/get Catalog and set/get
Schema to the implementation of different engines|boolean|1.6.0
kyuubi.engine.operation.log.dir.root|engine_operation_logs|Root directory for
query operation log at engine-side.|string|1.4.0
+kyuubi.engine.pool.balance.policy|RANDOM|The balance policy of queries in
engine pool.<ul> <li>RANDOM - Randomly use the engine in the pool</li>
<li>POLLING - Polling use the engine in the pool</li> </ul>|string|1.7.0
kyuubi.engine.pool.name|engine-pool|The name of engine pool.|string|1.5.0
kyuubi.engine.pool.size|-1|The size of engine pool. Note that, if the size is
less than 1, the engine pool will not be enabled; otherwise, the size of the
engine pool will be min(this, kyuubi.engine.pool.size.threshold).|int|1.4.0
kyuubi.engine.pool.size.threshold|9|This parameter is introduced as a
server-side parameter, and controls the upper limit of the engine
pool.|int|1.4.0
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index 93ef7d15d..ddefa455c 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -1408,6 +1408,17 @@ object KyuubiConf {
.intConf
.createWithDefault(-1)
+ val ENGINE_POOL_BALANCE_POLICY: ConfigEntry[String] =
+ buildConf("kyuubi.engine.pool.balance.policy")
+ .doc("The balance policy of queries in engine pool.<ul>" +
+ " <li>RANDOM - Randomly use the engine in the pool</li>" +
+ " <li>POLLING - Polling use the engine in the pool</li> </ul>")
+ .version("1.7.0")
+ .stringConf
+ .transform(_.toUpperCase(Locale.ROOT))
+ .checkValues(Set("RANDOM", "POLLING"))
+ .createWithDefault("RANDOM")
+
val ENGINE_INITIALIZE_SQL: ConfigEntry[Seq[String]] =
buildConf("kyuubi.engine.initialize.sql")
.doc("SemiColon-separated list of SQL statements to be initialized in
the newly created " +
diff --git
a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/DiscoveryClient.scala
b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/DiscoveryClient.scala
index 522487ef7..29c1734f8 100644
--- a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/DiscoveryClient.scala
+++ b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/DiscoveryClient.scala
@@ -45,6 +45,11 @@ trait DiscoveryClient extends Logging {
*/
def getData(path: String): Array[Byte]
+ /**
+ * Set the data under path.
+ */
+ def setData(path: String, data: Array[Byte]): Boolean
+
/**
* Get the paths under given path.
* @return list of path
@@ -164,6 +169,14 @@ trait DiscoveryClient extends Logging {
basePath: String,
initData: String,
useProtection: Boolean = false): Unit
+
+ /**
+ * Atomically get an Int number and add one
+ * @param path the path of stored data,
+ * If the path does not exist, it will be created and
initialized to 0
+ * @return the stored data under path
+ */
+ def getAndIncrement(path: String): Int
}
object DiscoveryClient {
diff --git
a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/etcd/EtcdDiscoveryClient.scala
b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/etcd/EtcdDiscoveryClient.scala
index 56545bf9e..e3dd2f319 100644
---
a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/etcd/EtcdDiscoveryClient.scala
+++
b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/etcd/EtcdDiscoveryClient.scala
@@ -124,6 +124,11 @@ class EtcdDiscoveryClient(conf: KyuubiConf) extends
DiscoveryClient {
}
}
+ def setData(path: String, data: Array[Byte]): Boolean = {
+ val response = kvClient.put(ByteSequence.from(path.getBytes),
ByteSequence.from(data)).get()
+ response != null
+ }
+
def getChildren(path: String): List[String] = {
val kvs = kvClient.get(
ByteSequence.from(path.getBytes()),
@@ -290,6 +295,19 @@ class EtcdDiscoveryClient(conf: KyuubiConf) extends
DiscoveryClient {
ByteSequence.from(initData.getBytes())).get()
}
+ def getAndIncrement(path: String): Int = {
+ val lockPath = s"${path}_tmp_for_lock"
+ tryWithLock(lockPath, 60 * 1000) {
+ if (pathNonExists(path)) {
+ create(path, "PERSISTENT")
+ setData(path, String.valueOf(0).getBytes)
+ }
+ val s = new String(getData(path)).toInt
+ setData(path, String.valueOf(s + 1).getBytes)
+ s
+ }
+ }
+
private def createPersistentNode(
conf: KyuubiConf,
namespace: String,
diff --git
a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClient.scala
b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClient.scala
index 965bac417..451d8544e 100644
---
a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClient.scala
+++
b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClient.scala
@@ -26,6 +26,7 @@ import scala.collection.JavaConverters._
import com.google.common.annotations.VisibleForTesting
import org.apache.curator.framework.CuratorFramework
+import org.apache.curator.framework.recipes.atomic.{AtomicValue,
DistributedAtomicInteger}
import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex
import org.apache.curator.framework.recipes.nodes.PersistentNode
import org.apache.curator.framework.state.ConnectionState
@@ -33,6 +34,7 @@ import
org.apache.curator.framework.state.ConnectionState.CONNECTED
import org.apache.curator.framework.state.ConnectionState.LOST
import org.apache.curator.framework.state.ConnectionState.RECONNECTED
import org.apache.curator.framework.state.ConnectionStateListener
+import org.apache.curator.retry.RetryForever
import org.apache.curator.utils.ZKPaths
import org.apache.zookeeper.CreateMode
import org.apache.zookeeper.CreateMode.PERSISTENT
@@ -86,6 +88,10 @@ class ZookeeperDiscoveryClient(conf: KyuubiConf) extends
DiscoveryClient {
zkClient.getData.forPath(path)
}
+ def setData(path: String, data: Array[Byte]): Boolean = {
+ zkClient.setData().forPath(path, data) != null
+ }
+
def getChildren(path: String): List[String] = {
zkClient.getChildren.forPath(path).asScala.toList
}
@@ -290,6 +296,15 @@ class ZookeeperDiscoveryClient(conf: KyuubiConf) extends
DiscoveryClient {
secretNode.start()
}
+ def getAndIncrement(path: String): Int = {
+ val dai = new DistributedAtomicInteger(zkClient, path, new
RetryForever(1000))
+ var atomicVal: AtomicValue[Integer] = null
+ do {
+ atomicVal = dai.increment()
+ } while (atomicVal == null || !atomicVal.succeeded())
+ atomicVal.preValue().intValue()
+ }
+
/**
* Refer to the implementation of HIVE-11581 to simplify user connection
parameters.
* https://issues.apache.org/jira/browse/HIVE-11581
diff --git
a/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/DiscoveryClientTests.scala
b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/DiscoveryClientTests.scala
index 585b51bba..87db340b5 100644
---
a/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/DiscoveryClientTests.scala
+++
b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/DiscoveryClientTests.scala
@@ -150,4 +150,24 @@ trait DiscoveryClientTests extends KyuubiFunSuite {
assert(e.getMessage contains s"Timeout to lock on path [$lockPath]")
}
}
+
+ test("getAndIncrement method test") {
+ withDiscoveryClient(conf) { discoveryClient =>
+ (0 until 10).foreach { i =>
+ val ii = discoveryClient.getAndIncrement("/get_and_increment_test")
+ assert(i == ii)
+ }
+ }
+ }
+
+ test("setData method test") {
+ withDiscoveryClient(conf) { discoveryClient =>
+ val data = "abc";
+ val path = "/setData_test"
+ discoveryClient.create(path, "PERSISTENT")
+ discoveryClient.setData(path, data.getBytes)
+ val dataFromGet = new String(discoveryClient.getData(path))
+ assert(data == dataFromGet)
+ }
+ }
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
index 10ffb87c9..ed6282917 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
@@ -37,7 +37,7 @@ import org.apache.kyuubi.engine.jdbc.JdbcProcessBuilder
import org.apache.kyuubi.engine.spark.SparkProcessBuilder
import org.apache.kyuubi.engine.trino.TrinoProcessBuilder
import org.apache.kyuubi.ha.HighAvailabilityConf.{HA_ENGINE_REF_ID,
HA_NAMESPACE}
-import org.apache.kyuubi.ha.client.{DiscoveryClient, DiscoveryPaths}
+import org.apache.kyuubi.ha.client.{DiscoveryClient, DiscoveryClientProvider,
DiscoveryPaths}
import org.apache.kyuubi.metrics.MetricsConstants.{ENGINE_FAIL,
ENGINE_TIMEOUT, ENGINE_TOTAL}
import org.apache.kyuubi.metrics.MetricsSystem
import org.apache.kyuubi.operation.log.OperationLog
@@ -72,27 +72,14 @@ private[kyuubi] class EngineRef(
private val clientPoolName: String = conf.get(ENGINE_POOL_NAME)
+ private val enginePoolBalancePolicy: String =
conf.get(ENGINE_POOL_BALANCE_POLICY)
+
// In case the multi kyuubi instances have the small gap of timeout, here we
add
// a small amount of time for timeout
private val LOCK_TIMEOUT_SPAN_FACTOR = if (Utils.isTesting) 0.5 else 0.1
private var builder: ProcBuilder = _
- @VisibleForTesting
- private[kyuubi] val subdomain: String =
conf.get(ENGINE_SHARE_LEVEL_SUBDOMAIN) match {
- case Some(_subdomain) => _subdomain
- case None if clientPoolSize > 0 =>
- val poolSize = math.min(clientPoolSize, poolThreshold)
- if (poolSize < clientPoolSize) {
- warn(s"Request engine pool size($clientPoolSize) exceeds, fallback to
" +
- s"system threshold $poolThreshold")
- }
- // TODO: Currently, we use random policy, and later we can add a
sequential policy,
- // such as AtomicInteger % poolSize.
- s"$clientPoolName-${Random.nextInt(poolSize)}"
- case _ => "default" // [KYUUBI #1293]
- }
-
// Launcher of the engine
private[kyuubi] val appUser: String = shareLevel match {
case SERVER => Utils.currentUser
@@ -109,6 +96,32 @@ private[kyuubi] class EngineRef(
case _ => user
}
+ @VisibleForTesting
+ private[kyuubi] val subdomain: String =
conf.get(ENGINE_SHARE_LEVEL_SUBDOMAIN) match {
+ case Some(_subdomain) => _subdomain
+ case None if clientPoolSize > 0 =>
+ val poolSize = math.min(clientPoolSize, poolThreshold)
+ if (poolSize < clientPoolSize) {
+ warn(s"Request engine pool size($clientPoolSize) exceeds, fallback to
" +
+ s"system threshold $poolThreshold")
+ }
+ val seqNum = enginePoolBalancePolicy match {
+ case "POLLING" =>
+ val snPath =
+ DiscoveryPaths.makePath(
+ s"${serverSpace}_${KYUUBI_VERSION}_${shareLevel}_$engineType",
+ "seq_num",
+ Array(appUser, clientPoolName))
+ DiscoveryClientProvider.withDiscoveryClient(conf) { client =>
+ client.getAndIncrement(snPath)
+ }
+ case "RANDOM" =>
+ Random.nextInt(poolSize)
+ }
+ s"$clientPoolName-${seqNum % poolSize}"
+ case _ => "default" // [KYUUBI #1293]
+ }
+
/**
* The default engine name, used as default `spark.app.name` if not set
*/
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefTests.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefTests.scala
index 538a36805..e2cf0d51c 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefTests.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefTests.scala
@@ -209,6 +209,19 @@ trait EngineRefTests extends KyuubiFunSuite {
conf.set(ENGINE_POOL_SIZE, 3)
val engine6 = new EngineRef(conf, user, id, null)
assert(engine6.subdomain.startsWith(s"$enginePoolName-"))
+
+ conf.unset(ENGINE_SHARE_LEVEL_SUBDOMAIN)
+ conf.set(ENGINE_POOL_SIZE, 5)
+ val pool_name = "pool_default_name"
+ conf.set(ENGINE_POOL_NAME, pool_name)
+ conf.set(HighAvailabilityConf.HA_NAMESPACE, "engine_test")
+ conf.set(HighAvailabilityConf.HA_ADDRESSES, getConnectString())
+ conf.set(ENGINE_POOL_BALANCE_POLICY, "POLLING")
+ (0 until (10)).foreach { i =>
+ val engine7 = new EngineRef(conf, user, id, null)
+ val engineNumber =
Integer.parseInt(engine7.subdomain.substring(pool_name.length + 1))
+ assert(engineNumber == (i % conf.get(ENGINE_POOL_SIZE)))
+ }
}
test("start and get engine address with lock") {