This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new c25961ea0 [KYUUBI #2887] Add a POLLING balance policy for spark engine 
pool
c25961ea0 is described below

commit c25961ea0271ea38e67e046fc3696605df2958b8
Author: yuqi <[email protected]>
AuthorDate: Mon Oct 24 14:17:59 2022 +0800

    [KYUUBI #2887] Add a POLLING balance policy for spark engine pool
    
    ### _Why are the changes needed?_
    
    As described in #2887, random policy may cause task-hot-issues or 
production accident, so a POLLING balance policy is added to avoid this 
problem, we can "set kyuubi.engine.pool.balance.policy = POLLING" to use it.
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [] Add screenshots for manual tests if appropriate
    
    - [x] [Run 
test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests)
 locally before make a pull request
    
    Closes #3662 from ychris78/epbp_1018.
    
    Closes #2887
    
    90834c1f [yuqi] fix bug
    c0ce8a7e [yuqi] Fix typos
    bec4ca13 [yuqi] Fix typos
    dc4e3637 [yuqi] please add test case to cover the added methods
    837c920b [yuqi] rename method getAndInc to getAndIncrement and modify the 
doc
    02a8311f [yuqi] add getAndInc method and refer it
    9436937d [yuqi] Improve configuration documentation
    21b5d7d1 [yuqi] [KYUUBI #2887] Add a polling balance policy for spark 
engine pool Improve configuration documentation and delete useless comments
    f0699b3c [yuqi] [KYUUBI #2887] Add a polling balance policy for spark 
engine pool
    
    Authored-by: yuqi <[email protected]>
    Signed-off-by: Fei Wang <[email protected]>
---
 docs/deployment/settings.md                        |  1 +
 .../org/apache/kyuubi/config/KyuubiConf.scala      | 11 ++++++
 .../apache/kyuubi/ha/client/DiscoveryClient.scala  | 13 +++++++
 .../ha/client/etcd/EtcdDiscoveryClient.scala       | 18 +++++++++
 .../zookeeper/ZookeeperDiscoveryClient.scala       | 15 ++++++++
 .../kyuubi/ha/client/DiscoveryClientTests.scala    | 20 ++++++++++
 .../scala/org/apache/kyuubi/engine/EngineRef.scala | 45 ++++++++++++++--------
 .../org/apache/kyuubi/engine/EngineRefTests.scala  | 13 +++++++
 8 files changed, 120 insertions(+), 16 deletions(-)

diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md
index de61e18ee..caaae3d9d 100644
--- a/docs/deployment/settings.md
+++ b/docs/deployment/settings.md
@@ -243,6 +243,7 @@ kyuubi.engine.jdbc.memory|1g|The heap memory for the jdbc 
query engine|string|1.
 kyuubi.engine.jdbc.type|&lt;undefined&gt;|The short name of jdbc 
type|string|1.6.0
 kyuubi.engine.operation.convert.catalog.database.enabled|true|When set to 
true, The engine converts the JDBC methods of set/get Catalog and set/get 
Schema to the implementation of different engines|boolean|1.6.0
 kyuubi.engine.operation.log.dir.root|engine_operation_logs|Root directory for 
query operation log at engine-side.|string|1.4.0
+kyuubi.engine.pool.balance.policy|RANDOM|The balance policy of queries in 
engine pool.<ul> <li>RANDOM - Randomly use the engine in the pool</li> 
<li>POLLING - Polling use the engine in the pool</li> </ul>|string|1.7.0
 kyuubi.engine.pool.name|engine-pool|The name of engine pool.|string|1.5.0
 kyuubi.engine.pool.size|-1|The size of engine pool. Note that, if the size is 
less than 1, the engine pool will not be enabled; otherwise, the size of the 
engine pool will be min(this, kyuubi.engine.pool.size.threshold).|int|1.4.0
 kyuubi.engine.pool.size.threshold|9|This parameter is introduced as a 
server-side parameter, and controls the upper limit of the engine 
pool.|int|1.4.0
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index 93ef7d15d..ddefa455c 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -1408,6 +1408,17 @@ object KyuubiConf {
     .intConf
     .createWithDefault(-1)
 
+  val ENGINE_POOL_BALANCE_POLICY: ConfigEntry[String] =
+    buildConf("kyuubi.engine.pool.balance.policy")
+      .doc("The balance policy of queries in engine pool.<ul>" +
+        " <li>RANDOM - Randomly use the engine in the pool</li>" +
+        " <li>POLLING - Polling use the engine in the pool</li> </ul>")
+      .version("1.7.0")
+      .stringConf
+      .transform(_.toUpperCase(Locale.ROOT))
+      .checkValues(Set("RANDOM", "POLLING"))
+      .createWithDefault("RANDOM")
+
   val ENGINE_INITIALIZE_SQL: ConfigEntry[Seq[String]] =
     buildConf("kyuubi.engine.initialize.sql")
       .doc("SemiColon-separated list of SQL statements to be initialized in 
the newly created " +
diff --git 
a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/DiscoveryClient.scala 
b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/DiscoveryClient.scala
index 522487ef7..29c1734f8 100644
--- a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/DiscoveryClient.scala
+++ b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/DiscoveryClient.scala
@@ -45,6 +45,11 @@ trait DiscoveryClient extends Logging {
    */
   def getData(path: String): Array[Byte]
 
+  /**
+   * Set the data under path.
+   */
+  def setData(path: String, data: Array[Byte]): Boolean
+
   /**
    * Get the paths under given path.
    * @return list of path
@@ -164,6 +169,14 @@ trait DiscoveryClient extends Logging {
       basePath: String,
       initData: String,
       useProtection: Boolean = false): Unit
+
+  /**
+   * Atomically get an Int number and add one
+   * @param path the path of stored data,
+   *             If the path does not exist, it will be created and 
initialized to 0
+   * @return the stored data under path
+   */
+  def getAndIncrement(path: String): Int
 }
 
 object DiscoveryClient {
diff --git 
a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/etcd/EtcdDiscoveryClient.scala
 
b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/etcd/EtcdDiscoveryClient.scala
index 56545bf9e..e3dd2f319 100644
--- 
a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/etcd/EtcdDiscoveryClient.scala
+++ 
b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/etcd/EtcdDiscoveryClient.scala
@@ -124,6 +124,11 @@ class EtcdDiscoveryClient(conf: KyuubiConf) extends 
DiscoveryClient {
     }
   }
 
+  def setData(path: String, data: Array[Byte]): Boolean = {
+    val response = kvClient.put(ByteSequence.from(path.getBytes), 
ByteSequence.from(data)).get()
+    response != null
+  }
+
   def getChildren(path: String): List[String] = {
     val kvs = kvClient.get(
       ByteSequence.from(path.getBytes()),
@@ -290,6 +295,19 @@ class EtcdDiscoveryClient(conf: KyuubiConf) extends 
DiscoveryClient {
       ByteSequence.from(initData.getBytes())).get()
   }
 
+  def getAndIncrement(path: String): Int = {
+    val lockPath = s"${path}_tmp_for_lock"
+    tryWithLock(lockPath, 60 * 1000) {
+      if (pathNonExists(path)) {
+        create(path, "PERSISTENT")
+        setData(path, String.valueOf(0).getBytes)
+      }
+      val s = new String(getData(path)).toInt
+      setData(path, String.valueOf(s + 1).getBytes)
+      s
+    }
+  }
+
   private def createPersistentNode(
       conf: KyuubiConf,
       namespace: String,
diff --git 
a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClient.scala
 
b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClient.scala
index 965bac417..451d8544e 100644
--- 
a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClient.scala
+++ 
b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClient.scala
@@ -26,6 +26,7 @@ import scala.collection.JavaConverters._
 
 import com.google.common.annotations.VisibleForTesting
 import org.apache.curator.framework.CuratorFramework
+import org.apache.curator.framework.recipes.atomic.{AtomicValue, 
DistributedAtomicInteger}
 import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex
 import org.apache.curator.framework.recipes.nodes.PersistentNode
 import org.apache.curator.framework.state.ConnectionState
@@ -33,6 +34,7 @@ import 
org.apache.curator.framework.state.ConnectionState.CONNECTED
 import org.apache.curator.framework.state.ConnectionState.LOST
 import org.apache.curator.framework.state.ConnectionState.RECONNECTED
 import org.apache.curator.framework.state.ConnectionStateListener
+import org.apache.curator.retry.RetryForever
 import org.apache.curator.utils.ZKPaths
 import org.apache.zookeeper.CreateMode
 import org.apache.zookeeper.CreateMode.PERSISTENT
@@ -86,6 +88,10 @@ class ZookeeperDiscoveryClient(conf: KyuubiConf) extends 
DiscoveryClient {
     zkClient.getData.forPath(path)
   }
 
+  def setData(path: String, data: Array[Byte]): Boolean = {
+    zkClient.setData().forPath(path, data) != null
+  }
+
   def getChildren(path: String): List[String] = {
     zkClient.getChildren.forPath(path).asScala.toList
   }
@@ -290,6 +296,15 @@ class ZookeeperDiscoveryClient(conf: KyuubiConf) extends 
DiscoveryClient {
     secretNode.start()
   }
 
+  def getAndIncrement(path: String): Int = {
+    val dai = new DistributedAtomicInteger(zkClient, path, new 
RetryForever(1000))
+    var atomicVal: AtomicValue[Integer] = null
+    do {
+      atomicVal = dai.increment()
+    } while (atomicVal == null || !atomicVal.succeeded())
+    atomicVal.preValue().intValue()
+  }
+
   /**
    * Refer to the implementation of HIVE-11581 to simplify user connection 
parameters.
    * https://issues.apache.org/jira/browse/HIVE-11581
diff --git 
a/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/DiscoveryClientTests.scala
 
b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/DiscoveryClientTests.scala
index 585b51bba..87db340b5 100644
--- 
a/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/DiscoveryClientTests.scala
+++ 
b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/DiscoveryClientTests.scala
@@ -150,4 +150,24 @@ trait DiscoveryClientTests extends KyuubiFunSuite {
       assert(e.getMessage contains s"Timeout to lock on path [$lockPath]")
     }
   }
+
+  test("getAndIncrement method test") {
+    withDiscoveryClient(conf) { discoveryClient =>
+      (0 until 10).foreach { i =>
+        val ii = discoveryClient.getAndIncrement("/get_and_increment_test")
+        assert(i == ii)
+      }
+    }
+  }
+
+  test("setData method test") {
+    withDiscoveryClient(conf) { discoveryClient =>
+      val data = "abc";
+      val path = "/setData_test"
+      discoveryClient.create(path, "PERSISTENT")
+      discoveryClient.setData(path, data.getBytes)
+      val dataFromGet = new String(discoveryClient.getData(path))
+      assert(data == dataFromGet)
+    }
+  }
 }
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
index 10ffb87c9..ed6282917 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
@@ -37,7 +37,7 @@ import org.apache.kyuubi.engine.jdbc.JdbcProcessBuilder
 import org.apache.kyuubi.engine.spark.SparkProcessBuilder
 import org.apache.kyuubi.engine.trino.TrinoProcessBuilder
 import org.apache.kyuubi.ha.HighAvailabilityConf.{HA_ENGINE_REF_ID, 
HA_NAMESPACE}
-import org.apache.kyuubi.ha.client.{DiscoveryClient, DiscoveryPaths}
+import org.apache.kyuubi.ha.client.{DiscoveryClient, DiscoveryClientProvider, 
DiscoveryPaths}
 import org.apache.kyuubi.metrics.MetricsConstants.{ENGINE_FAIL, 
ENGINE_TIMEOUT, ENGINE_TOTAL}
 import org.apache.kyuubi.metrics.MetricsSystem
 import org.apache.kyuubi.operation.log.OperationLog
@@ -72,27 +72,14 @@ private[kyuubi] class EngineRef(
 
   private val clientPoolName: String = conf.get(ENGINE_POOL_NAME)
 
+  private val enginePoolBalancePolicy: String = 
conf.get(ENGINE_POOL_BALANCE_POLICY)
+
   // In case the multi kyuubi instances have the small gap of timeout, here we 
add
   // a small amount of time for timeout
   private val LOCK_TIMEOUT_SPAN_FACTOR = if (Utils.isTesting) 0.5 else 0.1
 
   private var builder: ProcBuilder = _
 
-  @VisibleForTesting
-  private[kyuubi] val subdomain: String = 
conf.get(ENGINE_SHARE_LEVEL_SUBDOMAIN) match {
-    case Some(_subdomain) => _subdomain
-    case None if clientPoolSize > 0 =>
-      val poolSize = math.min(clientPoolSize, poolThreshold)
-      if (poolSize < clientPoolSize) {
-        warn(s"Request engine pool size($clientPoolSize) exceeds, fallback to 
" +
-          s"system threshold $poolThreshold")
-      }
-      // TODO: Currently, we use random policy, and later we can add a 
sequential policy,
-      //  such as AtomicInteger % poolSize.
-      s"$clientPoolName-${Random.nextInt(poolSize)}"
-    case _ => "default" // [KYUUBI #1293]
-  }
-
   // Launcher of the engine
   private[kyuubi] val appUser: String = shareLevel match {
     case SERVER => Utils.currentUser
@@ -109,6 +96,32 @@ private[kyuubi] class EngineRef(
     case _ => user
   }
 
+  @VisibleForTesting
+  private[kyuubi] val subdomain: String = 
conf.get(ENGINE_SHARE_LEVEL_SUBDOMAIN) match {
+    case Some(_subdomain) => _subdomain
+    case None if clientPoolSize > 0 =>
+      val poolSize = math.min(clientPoolSize, poolThreshold)
+      if (poolSize < clientPoolSize) {
+        warn(s"Request engine pool size($clientPoolSize) exceeds, fallback to 
" +
+          s"system threshold $poolThreshold")
+      }
+      val seqNum = enginePoolBalancePolicy match {
+        case "POLLING" =>
+          val snPath =
+            DiscoveryPaths.makePath(
+              s"${serverSpace}_${KYUUBI_VERSION}_${shareLevel}_$engineType",
+              "seq_num",
+              Array(appUser, clientPoolName))
+          DiscoveryClientProvider.withDiscoveryClient(conf) { client =>
+            client.getAndIncrement(snPath)
+          }
+        case "RANDOM" =>
+          Random.nextInt(poolSize)
+      }
+      s"$clientPoolName-${seqNum % poolSize}"
+    case _ => "default" // [KYUUBI #1293]
+  }
+
   /**
    * The default engine name, used as default `spark.app.name` if not set
    */
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefTests.scala 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefTests.scala
index 538a36805..e2cf0d51c 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefTests.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefTests.scala
@@ -209,6 +209,19 @@ trait EngineRefTests extends KyuubiFunSuite {
     conf.set(ENGINE_POOL_SIZE, 3)
     val engine6 = new EngineRef(conf, user, id, null)
     assert(engine6.subdomain.startsWith(s"$enginePoolName-"))
+
+    conf.unset(ENGINE_SHARE_LEVEL_SUBDOMAIN)
+    conf.set(ENGINE_POOL_SIZE, 5)
+    val pool_name = "pool_default_name"
+    conf.set(ENGINE_POOL_NAME, pool_name)
+    conf.set(HighAvailabilityConf.HA_NAMESPACE, "engine_test")
+    conf.set(HighAvailabilityConf.HA_ADDRESSES, getConnectString())
+    conf.set(ENGINE_POOL_BALANCE_POLICY, "POLLING")
+    (0 until (10)).foreach { i =>
+      val engine7 = new EngineRef(conf, user, id, null)
+      val engineNumber = 
Integer.parseInt(engine7.subdomain.substring(pool_name.length + 1))
+      assert(engineNumber == (i % conf.get(ENGINE_POOL_SIZE)))
+    }
   }
 
   test("start and get engine address with lock") {

Reply via email to