This is an automated email from the ASF dual-hosted git repository.
ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 9e1f9c2 [KYUUBI #1359] Support setting zk chroot path when initialize
cluster
9e1f9c2 is described below
commit 9e1f9c2e982f0b59da20b58476c6fac15de91c32
Author: sunfangbin <[email protected]>
AuthorDate: Fri Nov 12 12:04:35 2021 +0800
[KYUUBI #1359] Support setting zk chroot path when initialize cluster
### _Why are the changes needed?_
Users may encounter the following exception when trying to set
`kyuubi.ha.zookeeper.quorum` to `localhost:2181/lakehouse` with chroot path
`/lakehouse` nonexisted:
```
2021-11-10 10:56:34.510 ERROR server.KyuubiThriftBinaryFrontendService:
Error starting service KyuubiServiceDiscovery
org.apache.kyuubi.KyuubiException: Failed to create namespace '/kyuubi'
at
org.apache.kyuubi.ha.client.ServiceDiscovery$.createServiceNode(ServiceDiscovery.scala:225)
at
org.apache.kyuubi.ha.client.ServiceDiscovery.start(ServiceDiscovery.scala:101)
......
Caused by: org.apache.zookeeper.KeeperException$NoNodeException:
KeeperErrorCode = NoNode for /kyuubi
at
org.apache.zookeeper.KeeperException.create(KeeperException.java:114)
......
```
It is wonderful to support this since zookeeper connection with chroot path
is generally recommended in production environments. With this feture the
znodes in zookeeper likes below:
```
[zk: localhost:2181(CONNECTED) 28] ls /lakehouse
[kyuubi, kyuubi_USER, kyuubi_USER_SPARK_SQL]
```
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run
test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #1359 from murong00/branch-1358.
Closes #1359
42459ba0 [sunfangbin] Add a test to cover the changes
0191ab66 [sunfangbin] Fail the invalid cases
d04c532b [sunfangbin] address all the comments
8e7b5a64 [sunfangbin] Support setting zk chroot path when initialize cluster
Authored-by: sunfangbin <[email protected]>
Signed-off-by: ulysses-you <[email protected]>
---
.../org/apache/kyuubi/server/KyuubiServer.scala | 44 ++++++++++++++
.../apache/kyuubi/server/KyuubiServerSuite.scala | 67 +++++++++++++++++++++-
2 files changed, 110 insertions(+), 1 deletion(-)
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala
index 8eb28db..f27d0df 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala
@@ -17,9 +17,15 @@
package org.apache.kyuubi.server
+import java.util
+
import scala.util.Properties
+import org.apache.curator.utils.ZKPaths
import org.apache.hadoop.security.UserGroupInformation
+import org.apache.zookeeper.CreateMode.PERSISTENT
+import org.apache.zookeeper.KeeperException
+import org.apache.zookeeper.KeeperException.NodeExistsException
import org.apache.kyuubi._
import org.apache.kyuubi.config.KyuubiConf
@@ -27,6 +33,7 @@ import
org.apache.kyuubi.config.KyuubiConf.{FRONTEND_PROTOCOLS, FrontendProtocol
import org.apache.kyuubi.config.KyuubiConf.FrontendProtocols._
import org.apache.kyuubi.ha.HighAvailabilityConf._
import org.apache.kyuubi.ha.client.{ServiceDiscovery, ZooKeeperAuthTypes}
+import org.apache.kyuubi.ha.client.ZooKeeperClientProvider._
import org.apache.kyuubi.metrics.{MetricsConf, MetricsSystem}
import org.apache.kyuubi.service.{AbstractBackendService,
AbstractFrontendService, Serverable}
import org.apache.kyuubi.util.{KyuubiHadoopUtils, SignalRegister}
@@ -42,6 +49,43 @@ object KyuubiServer extends Logging {
zkServer.start()
conf.set(HA_ZK_QUORUM, zkServer.getConnectString)
conf.set(HA_ZK_AUTH_TYPE, ZooKeeperAuthTypes.NONE.toString)
+ } else {
+ // create chroot path if necessary
+ val connectionStr = conf.get(HA_ZK_QUORUM)
+ val addresses = connectionStr.split(",")
+ val slashOption = util.Arrays.copyOfRange(addresses, 0, addresses.length
-1)
+ .toList
+ .find(_.contains("/"))
+ if (slashOption.isDefined) {
+ throw new IllegalArgumentException(s"Illegal zookeeper quorum
'$connectionStr', " +
+ s"the chroot path started with / is only allowed at the end!")
+ }
+ val chrootIndex = connectionStr.indexOf("/")
+ val chrootOption = {
+ if (chrootIndex > 0) Some(connectionStr.substring(chrootIndex))
+ else None
+ }
+ chrootOption.foreach { chroot =>
+ val zkConnectionForChrootCreation = connectionStr.substring(0,
chrootIndex)
+ val overrideQuorumConf = conf.clone.set(HA_ZK_QUORUM,
zkConnectionForChrootCreation)
+ withZkClient(overrideQuorumConf) { zkClient =>
+ if (zkClient.checkExists().forPath(chroot) == null) {
+ val chrootPath = ZKPaths.makePath(null, chroot)
+ try {
+ zkClient
+ .create()
+ .creatingParentsIfNeeded()
+ .withMode(PERSISTENT)
+ .forPath(chrootPath)
+ } catch {
+ case _: NodeExistsException => // do nothing
+ case e: KeeperException =>
+ throw new KyuubiException(s"Failed to create chroot path
'$chrootPath'", e)
+ }
+ }
+ }
+ info(s"Created zookeeper chroot path $chroot")
+ }
}
val server = new KyuubiServer()
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/KyuubiServerSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/KyuubiServerSuite.scala
index f749b05..241c8dc 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/KyuubiServerSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/KyuubiServerSuite.scala
@@ -17,12 +17,43 @@
package org.apache.kyuubi.server
-import org.apache.kyuubi.{KyuubiException, KyuubiFunSuite}
+import org.apache.kyuubi.{KyuubiException, KyuubiFunSuite, Utils}
import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.ha.HighAvailabilityConf
+import org.apache.kyuubi.ha.client.ZooKeeperClientProvider
import org.apache.kyuubi.service.ServiceState._
+import org.apache.kyuubi.zookeeper.{EmbeddedZookeeper, ZookeeperConf}
class KyuubiServerSuite extends KyuubiFunSuite {
+ private var zkServer: EmbeddedZookeeper = _
+ private var server: KyuubiServer = _
+
+ override def beforeAll(): Unit = {
+ val conf = KyuubiConf()
+ zkServer = new EmbeddedZookeeper()
+ conf.set(ZookeeperConf.ZK_CLIENT_PORT, 0)
+ val zkData = Utils.createTempDir()
+ conf.set(ZookeeperConf.ZK_DATA_DIR, zkData.toString)
+ zkServer.initialize(conf)
+ zkServer.start()
+ super.beforeAll()
+ Thread.sleep(1000)
+ }
+
+ override def afterAll(): Unit = {
+ if (server != null) {
+ server.stop()
+ server = null
+ }
+
+ if (zkServer != null) {
+ zkServer.stop()
+ zkServer = null
+ }
+ super.afterAll()
+ }
+
test("kyuubi server basic") {
val server = new KyuubiServer()
server.stop()
@@ -65,4 +96,38 @@ class KyuubiServerSuite extends KyuubiFunSuite {
assert(e.getMessage.contains("Failed to initialize frontend service"))
assert(e.getCause.getMessage === "Invalid Port number")
}
+
+ test("invalid zookeeper quorum") {
+ val conf = KyuubiConf()
+ val quorum1 = "localhost:2181/lake,localhost:2182/lake"
+ conf.set(HighAvailabilityConf.HA_ZK_QUORUM, quorum1)
+ val exp1 =
intercept[IllegalArgumentException](KyuubiServer.startServer(conf))
+ assert(exp1.getMessage === s"Illegal zookeeper quorum '$quorum1', " +
+ s"the chroot path started with / is only allowed at the end!")
+
+ val quorum2 = "localhost:2181/lake,localhost:2182"
+ conf.set(HighAvailabilityConf.HA_ZK_QUORUM, quorum2)
+ val exp2 =
intercept[IllegalArgumentException](KyuubiServer.startServer(conf))
+ assert(exp2.getMessage === s"Illegal zookeeper quorum '$quorum2', " +
+ s"the chroot path started with / is only allowed at the end!")
+ }
+
+ test("kyuubi server starts with chroot") {
+ val conf = KyuubiConf()
+ val zkConnection = zkServer.getConnectString
+ val chrootPath = "/lake"
+ conf.set(HighAvailabilityConf.HA_ZK_QUORUM, zkConnection)
+ // chroot path does not exist before server start
+ ZooKeeperClientProvider.withZkClient(conf) { client =>
+ assert(client.checkExists().forPath(chrootPath) == null)
+ }
+
+ val zkWithChroot = zkConnection + chrootPath
+ val chrootConf = conf.clone.set(HighAvailabilityConf.HA_ZK_QUORUM,
zkWithChroot)
+ server = KyuubiServer.startServer(chrootConf)
+ // chroot path exists after server started
+ ZooKeeperClientProvider.withZkClient(conf) { client =>
+ assert(client.checkExists().forPath(chrootPath) != null)
+ }
+ }
}