This is an automated email from the ASF dual-hosted git repository.
bowenliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 8862767827 [KYUUBI #6034] Kyuubi Server HA&ZK get server from
serverHosts support more strategy
8862767827 is described below
commit 88627678273636b78c235f20be65ce274e6d9806
Author: Bowen Liang <[email protected]>
AuthorDate: Wed Oct 23 16:57:09 2024 +0800
[KYUUBI #6034] Kyuubi Server HA&ZK get server from serverHosts support more
strategy
# :mag: Description
## Issue References ๐
This pull request fixes #6034
## Describe Your Solution ๐ง
Currently, use beeline to connect kyuubiServer with HA mode, the strategy
only support random, this will lead to a high load on the machine. So i make
this pr to support choose strategy.
[description]
First, we need know, beeline connect kyuubiServer dependency on
kyuubi-hive-jdbc, it is isolated from the kyuubi cluster, so the code only
support random choose serverHost from zk node /${namespace}. Because
kyuubi-hive-jdbc is a stateless module, only run once, cannot store var about
get serverHost from zk node.
[Solution]
This pr, we could implement a interface named ChooseServerStrategy to
choose serverHost. I implement two strategy
1. poll: it will create a zk node named ${namespace}-counter, when a
beeline client want connect kyuubiServer, the node will increment 1, use this
value to take the remainder from serverHosts, like counter % serverHost.size,
so we could get a order serverHost
2. random: random get serverHost from serverHosts
3. User Definied Class: implemented the ChooseServerStrategy, then put the
jar to beeline-jars, it can use your strategy to choose serverHost
## Types of changes :bookmark:
- [ ] Bugfix (non-breaking change which fixes an issue)
- [x] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing
functionality to change)
## Test Plan ๐งช
Test the Strategy in my test Cluster
#### Behavior Without This Pull Request :coffin:



#### Behavior With This Pull Request :tada:
[Use Case]
1. poll: `bin/beeline -u
'jdbc:hive2://xxx:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=kyuubi;zooKeeperStrategy=poll?spark.yarn.queue=root.kylin;spark.app.name=testspark;spark.shuffle.useOldFetchProtocol=true'
-n mfw_hadoop --verbose=true --showNestedErrs=true`
2. random: `bin/beeline -u
'jdbc:hive2://xxx:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=kyuubi;zooKeeperStrategy=random?spark.yarn.queue=root.kylin;spark.app.name=testspark;spark.shuffle.useOldFetchProtocol=true'
-n mfw_hadoop --verbose=true --showNestedErrs=true` or `bin/beeline -u
'jdbc:hive2://xxx:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=kyuubi?spark.yarn.queue=root.kylin;spark.app.name=testspark;spark.shuffle.useOldFetchProtocol=true'
-n mfw_hadoop --ve [...]
3. YourStrategy: `bin/beeline -u
'jdbc:hive2://xxx:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=kyuubi;zooKeeperStrategy=xxx.xxx.xxx.XxxChooseServerStrategy?spark.yarn.queue=root.kylin;spark.app.name=testspark;spark.shuffle.useOldFetchProtocol=true'
-n mfw_hadoop --verbose=true --showNestedErrs=true`
[Result: The Cluster have two Server (221,233)]
1. poll:
1.1. zkNode: counterValue

1.2. result:




2. random:



3. YourStrategy(the test case only get the first serverHost):



#### Related Unit Tests
There is no Unit Tests.
---
# Checklist ๐
- [ ] This patch was not authored or co-authored using [Generative
Tooling](https://www.apache.org/legal/generative-tooling.html)
**Be nice. Be informative.**
Closes #6213 from davidyuan1223/ha_zk_support_more_strategy.
Closes #6034
961d3e989 [Bowen Liang] rename ServerStrategyFactory to
ServerSelectStrategyFactory
353f94059 [Bowen Liang] repeat
8822ad471 [Bowen Liang] repeat
619339402 [Bowen Liang] nit
e94f9e909 [Bowen Liang] nit
40f427ae5 [Bowen Liang] rename StrategyFactory to
StrategyFactoryServerStrategyFactory
7668f99cc [Bowen Liang] test name
e194ea62f [Bowen Liang] remove ZooKeeperHiveClientException from method
signature of chooseServer
265965e5d [Bowen Liang] polling
b39c56700 [Bowen Liang] style
1ab79b494 [Bowen Liang] strategyName
8f8ca28f2 [Bowen Liang] nit
228bf1091 [Bowen Liang] rename parameter zooKeeperStrategy to
serverSelectStrategy
125c82358 [Bowen Liang] rename ChooseServerStrategy to ServerSelectStrategy
b4aeb3dbd [Bowen Liang] repeat testing on pollingChooseStrategy
465548005 [davidyuan] update
09a84f1f9 [david yuan] remove the distirbuted lock
93f4a2699 [davidyuan] remove reset
7b0c1b811 [davidyuan] fix var not valid and counter getAndIncrement
c95382a23 [davidyuan] fix var not valid and counter getAndIncrement
9ed2cac85 [david yuan] remove test comment
8eddd7682 [davidyuan] Add Strategy Unit Test Case and fix the polling
strategy counter begin with 0
73952f878 [davidyuan] Kyuubi Server HA&ZK get server from serverHosts
support more strategy
97b959776 [davidyuan] Kyuubi Server HA&ZK get server from serverHosts
support more strategy
ee5a9ad68 [davidyuan] Kyuubi Server HA&ZK get server from serverHosts
support more strategy
6a0445357 [davidyuan] Kyuubi Server HA&ZK get server from serverHosts
support more strategy
1892f148d [davidyuan] add common method to get session level config
7c0c6058d [yuanfuyuan] fix_4186
Lead-authored-by: Bowen Liang <[email protected]>
Co-authored-by: davidyuan <[email protected]>
Co-authored-by: davidyuan <[email protected]>
Co-authored-by: david yuan <[email protected]>
Co-authored-by: yuanfuyuan <[email protected]>
Signed-off-by: Bowen Liang <[email protected]>
---
kyuubi-ha/pom.xml | 7 +++
.../zookeeper/ZookeeperDiscoveryClientSuite.scala | 40 +++++++++++++++-
.../kyuubi/jdbc/hive/JdbcConnectionParams.java | 1 +
.../jdbc/hive/ZooKeeperHiveClientHelper.java | 22 ++++++++-
.../jdbc/hive/strategy/ServerSelectStrategy.java | 25 ++++++++++
.../hive/strategy/ServerSelectStrategyFactory.java | 47 +++++++++++++++++++
.../hive/strategy/zk/PollingSelectStrategy.java | 53 ++++++++++++++++++++++
.../hive/strategy/zk/RandomSelectStrategy.java | 33 ++++++++++++++
8 files changed, 225 insertions(+), 3 deletions(-)
diff --git a/kyuubi-ha/pom.xml b/kyuubi-ha/pom.xml
index e1b46c8589..749ab06326 100644
--- a/kyuubi-ha/pom.xml
+++ b/kyuubi-ha/pom.xml
@@ -132,6 +132,13 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.kyuubi</groupId>
+ <artifactId>kyuubi-hive-jdbc</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git
a/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClientSuite.scala
b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClientSuite.scala
index 34ed055938..ba3d0650d1 100644
---
a/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClientSuite.scala
+++
b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/zookeeper/ZookeeperDiscoveryClientSuite.scala
@@ -34,8 +34,9 @@ import org.apache.kyuubi.ha.HighAvailabilityConf._
import org.apache.kyuubi.ha.client._
import org.apache.kyuubi.ha.client.DiscoveryClientProvider.withDiscoveryClient
import org.apache.kyuubi.ha.client.zookeeper.ZookeeperClientProvider._
+import org.apache.kyuubi.jdbc.hive.strategy.{ServerSelectStrategy,
ServerSelectStrategyFactory}
import org.apache.kyuubi.service._
-import org.apache.kyuubi.shaded.curator.framework.CuratorFrameworkFactory
+import org.apache.kyuubi.shaded.curator.framework.{CuratorFramework,
CuratorFrameworkFactory}
import org.apache.kyuubi.shaded.curator.retry.ExponentialBackoffRetry
import org.apache.kyuubi.shaded.zookeeper.ZooDefs
import org.apache.kyuubi.shaded.zookeeper.data.ACL
@@ -227,4 +228,41 @@ abstract class ZookeeperDiscoveryClientSuite extends
DiscoveryClientTests
discovery.stop()
}
}
+
+ test("server select strategy with zookeeper") {
+ val zkClient = CuratorFrameworkFactory.builder()
+ .connectString(getConnectString)
+ .sessionTimeoutMs(5000)
+ .retryPolicy(new ExponentialBackoffRetry(1000, 3))
+ .build
+ zkClient.start()
+
+ val namespace = "kyuubi-strategy-test"
+ val testServerHosts = Seq(
+ "testNode1",
+ "testNode2",
+ "testNode3").asJava
+ // test polling strategy
+ val pollingStrategy = ServerSelectStrategyFactory.createStrategy("polling")
+ 1 to testServerHosts.size() * 2 foreach { _ =>
+ assertResult(f"testNode1")(pollingStrategy.chooseServer(testServerHosts,
zkClient, namespace))
+ assertResult(f"testNode2")(pollingStrategy.chooseServer(testServerHosts,
zkClient, namespace))
+ assertResult(f"testNode3")(pollingStrategy.chooseServer(testServerHosts,
zkClient, namespace))
+ }
+
+ // test only get first serverHost strategy
+ val customStrategy = new ServerSelectStrategy {
+ override def chooseServer(
+ serverHosts: util.List[String],
+ zkClient: CuratorFramework,
+ namespace: String): String = serverHosts.get(0)
+ }
+ 1 to testServerHosts.size() * 2 foreach { _ =>
+ assertResult("testNode1") {
+ customStrategy.chooseServer(testServerHosts, zkClient, namespace)
+ }
+ }
+
+ zkClient.close()
+ }
}
diff --git
a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/JdbcConnectionParams.java
b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/JdbcConnectionParams.java
index b3884c694f..0db99da710 100644
---
a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/JdbcConnectionParams.java
+++
b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/JdbcConnectionParams.java
@@ -79,6 +79,7 @@ public class JdbcConnectionParams {
// Use ZooKeeper for indirection while using dynamic service discovery
static final String SERVICE_DISCOVERY_MODE_ZOOKEEPER = "zooKeeper";
static final String ZOOKEEPER_NAMESPACE = "zooKeeperNamespace";
+ static final String SERVER_SELECT_STRATEGY = "serverSelectStrategy";
// Default namespace value on ZooKeeper.
// This value is used if the param "zooKeeperNamespace" is not specified in
the JDBC Uri.
static final String ZOOKEEPER_DEFAULT_NAMESPACE = "hiveserver2";
diff --git
a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/ZooKeeperHiveClientHelper.java
b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/ZooKeeperHiveClientHelper.java
index 948fd33346..f94bdb431e 100644
---
a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/ZooKeeperHiveClientHelper.java
+++
b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/ZooKeeperHiveClientHelper.java
@@ -22,9 +22,11 @@ import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ThreadLocalRandom;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import org.apache.kyuubi.jdbc.hive.strategy.ServerSelectStrategy;
+import org.apache.kyuubi.jdbc.hive.strategy.ServerSelectStrategyFactory;
+import org.apache.kyuubi.jdbc.hive.strategy.zk.RandomSelectStrategy;
import org.apache.kyuubi.shaded.curator.framework.CuratorFramework;
import org.apache.kyuubi.shaded.curator.framework.CuratorFrameworkFactory;
import org.apache.kyuubi.shaded.curator.retry.ExponentialBackoffRetry;
@@ -111,7 +113,7 @@ class ZooKeeperHiveClientHelper {
try (CuratorFramework zooKeeperClient = getZkClient(connParams)) {
List<String> serverHosts = getServerHosts(connParams, zooKeeperClient);
// Now pick a server node randomly
- String serverNode =
serverHosts.get(ThreadLocalRandom.current().nextInt(serverHosts.size()));
+ String serverNode = chooseServer(connParams, serverHosts,
zooKeeperClient);
updateParamsWithZKServerNode(connParams, zooKeeperClient, serverNode);
} catch (Exception e) {
throw new ZooKeeperHiveClientException(
@@ -120,6 +122,22 @@ class ZooKeeperHiveClientHelper {
// Close the client connection with ZooKeeper
}
+ private static String chooseServer(
+ JdbcConnectionParams connParams, List<String> serverHosts,
CuratorFramework zkClient) {
+ String zooKeeperNamespace = getZooKeeperNamespace(connParams);
+ String strategyName =
+ connParams
+ .getSessionVars()
+ .getOrDefault(
+ JdbcConnectionParams.SERVER_SELECT_STRATEGY,
RandomSelectStrategy.strategyName);
+ try {
+ ServerSelectStrategy strategy =
ServerSelectStrategyFactory.createStrategy(strategyName);
+ return strategy.chooseServer(serverHosts, zkClient, zooKeeperNamespace);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to choose server with strategy " +
strategyName, e);
+ }
+ }
+
static List<JdbcConnectionParams> getDirectParamsList(JdbcConnectionParams
connParams)
throws ZooKeeperHiveClientException {
try (CuratorFramework zooKeeperClient = getZkClient(connParams)) {
diff --git
a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/strategy/ServerSelectStrategy.java
b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/strategy/ServerSelectStrategy.java
new file mode 100644
index 0000000000..740c357763
--- /dev/null
+++
b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/strategy/ServerSelectStrategy.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.jdbc.hive.strategy;
+
+import java.util.List;
+import org.apache.kyuubi.shaded.curator.framework.CuratorFramework;
+
+public interface ServerSelectStrategy {
+ String chooseServer(List<String> serverHosts, CuratorFramework zkClient,
String namespace);
+}
diff --git
a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/strategy/ServerSelectStrategyFactory.java
b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/strategy/ServerSelectStrategyFactory.java
new file mode 100644
index 0000000000..9950097ade
--- /dev/null
+++
b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/strategy/ServerSelectStrategyFactory.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.jdbc.hive.strategy;
+
+import java.lang.reflect.Constructor;
+import org.apache.kyuubi.jdbc.hive.strategy.zk.PollingSelectStrategy;
+import org.apache.kyuubi.jdbc.hive.strategy.zk.RandomSelectStrategy;
+
+public class ServerSelectStrategyFactory {
+ public static ServerSelectStrategy createStrategy(String strategyName) {
+ try {
+ switch (strategyName) {
+ case PollingSelectStrategy.strategyName:
+ return new PollingSelectStrategy();
+ case RandomSelectStrategy.strategyName:
+ return new RandomSelectStrategy();
+ default:
+ Class<?> clazz = Class.forName(strategyName);
+ if (ServerSelectStrategy.class.isAssignableFrom(clazz)) {
+ Constructor<? extends ServerSelectStrategy> constructor =
+ clazz.asSubclass(ServerSelectStrategy.class).getConstructor();
+ return constructor.newInstance();
+ } else {
+ throw new ClassNotFoundException(
+ "The loaded class does not implement ServerSelectStrategy");
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to init server select strategy", e);
+ }
+ }
+}
diff --git
a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/strategy/zk/PollingSelectStrategy.java
b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/strategy/zk/PollingSelectStrategy.java
new file mode 100644
index 0000000000..664c76defa
--- /dev/null
+++
b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/strategy/zk/PollingSelectStrategy.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.jdbc.hive.strategy.zk;
+
+import java.util.List;
+import org.apache.kyuubi.jdbc.hive.strategy.ServerSelectStrategy;
+import org.apache.kyuubi.shaded.curator.framework.CuratorFramework;
+import org.apache.kyuubi.shaded.curator.framework.recipes.atomic.AtomicValue;
+import
org.apache.kyuubi.shaded.curator.framework.recipes.atomic.DistributedAtomicInteger;
+import org.apache.kyuubi.shaded.curator.retry.RetryForever;
+
+public class PollingSelectStrategy implements ServerSelectStrategy {
+ public static final String strategyName = "polling";
+
+ private static final String COUNTER_PATH_PREFIX = "/";
+ private static final String COUNTER_PATH_SUFFIX = "-counter";
+
+ @Override
+ public String chooseServer(
+ List<String> serverHosts, CuratorFramework zkClient, String namespace) {
+ String counterPath = COUNTER_PATH_PREFIX + namespace + COUNTER_PATH_SUFFIX;
+ try {
+ return serverHosts.get(getAndIncrement(zkClient, counterPath) %
serverHosts.size());
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to choose server by polling select
strategy", e);
+ }
+ }
+
+ private int getAndIncrement(CuratorFramework zkClient, String path) throws
Exception {
+ DistributedAtomicInteger dai =
+ new DistributedAtomicInteger(zkClient, path, new RetryForever(3000));
+ AtomicValue<Integer> atomicVal;
+ do {
+ atomicVal = dai.add(1);
+ } while (atomicVal == null || !atomicVal.succeeded());
+ return atomicVal.preValue();
+ }
+}
diff --git
a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/strategy/zk/RandomSelectStrategy.java
b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/strategy/zk/RandomSelectStrategy.java
new file mode 100644
index 0000000000..f42fd52948
--- /dev/null
+++
b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/strategy/zk/RandomSelectStrategy.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.jdbc.hive.strategy.zk;
+
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.kyuubi.jdbc.hive.strategy.ServerSelectStrategy;
+import org.apache.kyuubi.shaded.curator.framework.CuratorFramework;
+
+public class RandomSelectStrategy implements ServerSelectStrategy {
+ public static final String strategyName = "random";
+
+ @Override
+ public String chooseServer(
+ List<String> serverHosts, CuratorFramework zkClient, String namespace) {
+ return
serverHosts.get(ThreadLocalRandom.current().nextInt(serverHosts.size()));
+ }
+}