This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 1061d17 [KYUUBI #1358] Add KyuubiMySQLFrontendService stub
1061d17 is described below
commit 1061d176c832967c57be60fb4c908191ddc2dfd9
Author: Cheng Pan <[email protected]>
AuthorDate: Thu Nov 11 16:50:52 2021 +0800
[KYUUBI #1358] Add KyuubiMySQLFrontendService stub
### _Why are the changes needed?_
Add KyuubiMySQLFrontendService stub, without Netty pipeline, part of #1334
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run
test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #1358 from pan3793/mysql-fe-stub.
Closes #1358
c091d955 [Cheng Pan] nit
61abb0f6 [Cheng Pan] Address comments
9ac70456 [Cheng Pan] Update conf
3d75acdf [Cheng Pan] nit
4a4d8a24 [Cheng Pan] Address comments
0136dd52 [Cheng Pan] nit
477474ff [Cheng Pan] KyuubiMySQLFrontendService Stub
Authored-by: Cheng Pan <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
conf/log4j.properties.template | 3 +
docs/deployment/settings.md | 9 ++
kyuubi-common/pom.xml | 5 +
.../org/apache/kyuubi/config/KyuubiConf.scala | 69 ++++++++---
.../scala/org/apache/kyuubi/util/NettyUtils.scala | 71 ++++++++++++
kyuubi-server/pom.xml | 5 -
.../kyuubi/server/KyuubiMySQLFrontendService.scala | 126 +++++++++++++++++++++
.../kyuubi/server/KyuubiRestFrontendService.scala | 12 +-
kyuubi-server/src/test/resources/log4j.properties | 3 +
.../org/apache/kyuubi/RestFrontendTestHelper.scala | 2 +-
.../server/KyuubiMySQLFrontendServiceSuite.scala | 56 +++++++++
.../kyuubi/service/NoopMySQLFrontendServer.scala | 25 ++++
12 files changed, 361 insertions(+), 25 deletions(-)
diff --git a/conf/log4j.properties.template b/conf/log4j.properties.template
index 4543adb..82cf9b1 100644
--- a/conf/log4j.properties.template
+++ b/conf/log4j.properties.template
@@ -25,3 +25,6 @@ log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd
HH:mm:ss.SSS} %p %
# Set the default kyuubi-ctl log level to WARN. When running the kyuubi-ctl,
the
# log level for this class is used to overwrite the root logger's log level.
log4j.logger.org.apache.kyuubi.ctl.ServiceControlCli=ERROR
+
+# Analysis MySQLFrontend protocol traffic
+# log4j.logger.org.apache.kyuubi.server.mysql.codec=TRACE
diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md
index 8ac415f..1e8fe97 100644
--- a/docs/deployment/settings.md
+++ b/docs/deployment/settings.md
@@ -205,6 +205,12 @@ kyuubi\.frontend\.login<br>\.timeout|<div style='width:
65pt;word-wrap: break-wo
kyuubi\.frontend\.max<br>\.message\.size|<div style='width: 65pt;word-wrap:
break-word;white-space: normal'>104857600</div>|<div style='width:
170pt;word-wrap: break-word;white-space: normal'>(deprecated) Maximum message
size in bytes a Kyuubi server will accept.</div>|<div style='width:
30pt'>int</div>|<div style='width: 20pt'>1.0.0</div>
kyuubi\.frontend\.max<br>\.worker\.threads|<div style='width: 65pt;word-wrap:
break-word;white-space: normal'>999</div>|<div style='width: 170pt;word-wrap:
break-word;white-space: normal'>(deprecated) Maximum number of threads in the
of frontend worker thread pool for the thrift frontend service</div>|<div
style='width: 30pt'>int</div>|<div style='width: 20pt'>1.0.0</div>
kyuubi\.frontend\.min<br>\.worker\.threads|<div style='width: 65pt;word-wrap:
break-word;white-space: normal'>9</div>|<div style='width: 170pt;word-wrap:
break-word;white-space: normal'>(deprecated) Minimum number of threads in the
of frontend worker thread pool for the thrift frontend service</div>|<div
style='width: 30pt'>int</div>|<div style='width: 20pt'>1.0.0</div>
+kyuubi\.frontend\.mysql<br>\.bind\.host|<div style='width: 65pt;word-wrap:
break-word;white-space: normal'><undefined></div>|<div style='width:
170pt;word-wrap: break-word;white-space: normal'>Hostname or IP of the machine
on which to run the MySQL frontend service.</div>|<div style='width:
30pt'>string</div>|<div style='width: 20pt'>1.4.0</div>
+kyuubi\.frontend\.mysql<br>\.bind\.port|<div style='width: 65pt;word-wrap:
break-word;white-space: normal'>3309</div>|<div style='width: 170pt;word-wrap:
break-word;white-space: normal'>Port of the machine on which to run the MySQL
frontend service.</div>|<div style='width: 30pt'>int</div>|<div style='width:
20pt'>1.4.0</div>
+kyuubi\.frontend\.mysql<br>\.max\.worker\.threads|<div style='width:
65pt;word-wrap: break-word;white-space: normal'>999</div>|<div style='width:
170pt;word-wrap: break-word;white-space: normal'>Maximum number of threads in
the command execution thread pool for the MySQL frontend service</div>|<div
style='width: 30pt'>int</div>|<div style='width: 20pt'>1.4.0</div>
+kyuubi\.frontend\.mysql<br>\.min\.worker\.threads|<div style='width:
65pt;word-wrap: break-word;white-space: normal'>9</div>|<div style='width:
170pt;word-wrap: break-word;white-space: normal'>Minimum number of threads in
the command execution thread pool for the MySQL frontend service</div>|<div
style='width: 30pt'>int</div>|<div style='width: 20pt'>1.4.0</div>
+kyuubi\.frontend\.mysql<br>\.netty\.worker\.threads|<div style='width:
65pt;word-wrap: break-word;white-space: normal'><undefined></div>|<div
style='width: 170pt;word-wrap: break-word;white-space: normal'>Number of thread
in the netty worker event loop of MySQL frontend service. Use min(cpu_cores, 8)
in default.</div>|<div style='width: 30pt'>int</div>|<div style='width:
20pt'>1.4.0</div>
+kyuubi\.frontend\.mysql<br>\.worker\.keepalive\.time|<div style='width:
65pt;word-wrap: break-word;white-space: normal'>PT1M</div>|<div style='width:
170pt;word-wrap: break-word;white-space: normal'>Time(ms) that an idle async
thread of the command execution thread pool will wait for a new task to arrive
before terminating in MySQL frontend service</div>|<div style='width:
30pt'>duration</div>|<div style='width: 20pt'>1.4.0</div>
kyuubi\.frontend<br>\.protocols|<div style='width: 65pt;word-wrap:
break-word;white-space: normal'>THRIFT_BINARY</div>|<div style='width:
170pt;word-wrap: break-word;white-space: normal'>A comma separated list for all
frontend protocols <ul> <li>THRIFT_BINARY - HiveServer2 compatible thrift
binary protocol.</li> <li>REST - Kyuubi defined REST API(experimental).</li>
</ul></div>|<div style='width: 30pt'>seq</div>|<div style='width:
20pt'>1.4.0</div>
kyuubi\.frontend\.rest<br>\.bind\.host|<div style='width: 65pt;word-wrap:
break-word;white-space: normal'><undefined></div>|<div style='width:
170pt;word-wrap: break-word;white-space: normal'>Hostname or IP of the machine
on which to run the REST frontend service.</div>|<div style='width:
30pt'>string</div>|<div style='width: 20pt'>1.4.0</div>
kyuubi\.frontend\.rest<br>\.bind\.port|<div style='width: 65pt;word-wrap:
break-word;white-space: normal'>10099</div>|<div style='width: 170pt;word-wrap:
break-word;white-space: normal'>Port of the machine on which to run the REST
frontend service.</div>|<div style='width: 30pt'>int</div>|<div style='width:
20pt'>1.4.0</div>
@@ -373,6 +379,9 @@
log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss.SSS} %p %
# Set the default kyuubi-ctl log level to WARN. When running the kyuubi-ctl,
the
# log level for this class is used to overwrite the root logger's log level.
log4j.logger.org.apache.kyuubi.ctl.ServiceControlCli=ERROR
+
+# Analysis MySQLFrontend protocol traffic
+# log4j.logger.org.apache.kyuubi.server.mysql.codec=TRACE
```
## Other Configurations
diff --git a/kyuubi-common/pom.xml b/kyuubi-common/pom.xml
index 02988bd..90923f9 100644
--- a/kyuubi-common/pom.xml
+++ b/kyuubi-common/pom.xml
@@ -99,6 +99,11 @@
</dependency>
<dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-all</artifactId>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minikdc</artifactId>
<scope>test</scope>
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index f738642..0a3eac3 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -27,6 +27,7 @@ import scala.collection.JavaConverters._
import org.apache.kyuubi.{Logging, Utils}
import org.apache.kyuubi.engine.{EngineType, ShareLevel}
import org.apache.kyuubi.service.authentication.{AuthTypes, SaslQOP}
+import org.apache.kyuubi.util.NettyUtils.MAX_NETTY_THREADS
case class KyuubiConf(loadSysDefault: Boolean = true) extends Logging {
import KyuubiConf._
@@ -136,6 +137,8 @@ case class KyuubiConf(loadSysDefault: Boolean = true)
extends Logging {
FRONTEND_THRIFT_BINARY_BIND_PORT,
FRONTEND_REST_BIND_HOST,
FRONTEND_REST_BIND_PORT,
+ FRONTEND_MYSQL_BIND_HOST,
+ FRONTEND_MYSQL_BIND_PORT,
AUTHENTICATION_METHOD,
KINIT_INTERVAL)
@@ -274,7 +277,6 @@ object KyuubiConf {
s"the frontend protocol should be one or more of
${FrontendProtocols.values.mkString(",")}")
.createWithDefault(Seq(FrontendProtocols.THRIFT_BINARY.toString))
- @deprecated(s"using ${FRONTEND_THRIFT_BINARY_BIND_HOST.key} instead",
"1.4.0")
val FRONTEND_BIND_HOST: OptionalConfigEntry[String] =
buildConf("frontend.bind.host")
.doc("(deprecated) Hostname or IP of the machine on which to run the
thrift frontend service " +
"via binary protocol.")
@@ -300,11 +302,10 @@ object KyuubiConf {
val FRONTEND_THRIFT_BINARY_BIND_PORT: ConfigEntry[Int] =
buildConf("frontend.thrift.binary.bind.port")
- .doc("Port of the machine on which to run the thrift frontend service via
binary protocol.")
- .version("1.4.0")
- .fallbackConf(FRONTEND_BIND_PORT)
+ .doc("Port of the machine on which to run the thrift frontend service
via binary protocol.")
+ .version("1.4.0")
+ .fallbackConf(FRONTEND_BIND_PORT)
- @deprecated(s"using ${FRONTEND_THRIFT_MIN_WORKER_THREADS.key} instead",
"1.4.0")
val FRONTEND_MIN_WORKER_THREADS: ConfigEntry[Int] =
buildConf("frontend.min.worker.threads")
.doc("(deprecated) Minimum number of threads in the of frontend worker
thread pool for " +
"the thrift frontend service")
@@ -319,7 +320,6 @@ object KyuubiConf {
.version("1.4.0")
.fallbackConf(FRONTEND_MIN_WORKER_THREADS)
- @deprecated(s"using ${FRONTEND_THRIFT_MAX_WORKER_THREADS.key} instead",
"1.4.0")
val FRONTEND_MAX_WORKER_THREADS: ConfigEntry[Int] =
buildConf("frontend.max.worker.threads")
.doc("(deprecated) Maximum number of threads in the of frontend worker
thread pool for " +
"the thrift frontend service")
@@ -334,7 +334,6 @@ object KyuubiConf {
.version("1.4.0")
.fallbackConf(FRONTEND_MAX_WORKER_THREADS)
- @deprecated(s"using ${FRONTEND_THRIFT_WORKER_KEEPALIVE_TIME.key} instead",
"1.4.0")
val FRONTEND_WORKER_KEEPALIVE_TIME: ConfigEntry[Long] =
buildConf("frontend.worker.keepalive.time")
.doc("(deprecated) Keep-alive time (in milliseconds) for an idle worker
thread")
@@ -348,7 +347,7 @@ object KyuubiConf {
.version("1.4.0")
.fallbackConf(FRONTEND_WORKER_KEEPALIVE_TIME)
- @deprecated(s"using ${FRONTEND_THRIFT_WORKER_KEEPALIVE_TIME.key} instead",
"1.4.0")
+ @deprecated(s"using ${FRONTEND_THRIFT_MAX_MESSAGE_SIZE.key} instead",
"1.4.0")
val FRONTEND_MAX_MESSAGE_SIZE: ConfigEntry[Int] =
buildConf("frontend.max.message.size")
.doc("(deprecated) Maximum message size in bytes a Kyuubi server will
accept.")
@@ -486,11 +485,11 @@ object KyuubiConf {
.transform(_.toLowerCase(Locale.ROOT))
.createWithDefault(SaslQOP.AUTH.toString)
- val FRONTEND_REST_BIND_HOST: OptionalConfigEntry[String] =
buildConf("frontend.rest.bind.host")
- .doc("Hostname or IP of the machine on which to run the REST frontend
service.")
- .version("1.4.0")
- .stringConf
- .createOptional
+ val FRONTEND_REST_BIND_HOST: ConfigEntry[Option[String]] =
+ buildConf("frontend.rest.bind.host")
+ .doc("Hostname or IP of the machine on which to run the REST frontend
service.")
+ .version("1.4.0")
+ .fallbackConf(FRONTEND_BIND_HOST)
val FRONTEND_REST_BIND_PORT: ConfigEntry[Int] =
buildConf("frontend.rest.bind.port")
.doc("Port of the machine on which to run the REST frontend service.")
@@ -499,6 +498,50 @@ object KyuubiConf {
.checkValue(p => p == 0 || (p > 1024 && p < 65535), "Invalid Port number")
.createWithDefault(10099)
+ val FRONTEND_MYSQL_BIND_HOST: ConfigEntry[Option[String]] =
+ buildConf("frontend.mysql.bind.host")
+ .doc("Hostname or IP of the machine on which to run the MySQL frontend
service.")
+ .version("1.4.0")
+ .fallbackConf(FRONTEND_BIND_HOST)
+
+ val FRONTEND_MYSQL_BIND_PORT: ConfigEntry[Int] =
buildConf("frontend.mysql.bind.port")
+ .doc("Port of the machine on which to run the MySQL frontend service.")
+ .version("1.4.0")
+ .intConf
+ .checkValue(p => p == 0 || (p > 1024 && p < 65535), "Invalid Port number")
+ .createWithDefault(3309)
+
+ val FRONTEND_MYSQL_NETTY_WORKER_THREADS: OptionalConfigEntry[Int] =
+ buildConf("frontend.mysql.netty.worker.threads")
+ .doc("Number of thread in the netty worker event loop of MySQL frontend
service. " +
+ s"Use min(cpu_cores, $MAX_NETTY_THREADS) in default.")
+ .version("1.4.0")
+ .intConf
+ .checkValue(n => n > 0 && n <= MAX_NETTY_THREADS,
+ s"Invalid thread number, must in (0, $MAX_NETTY_THREADS]")
+ .createOptional
+
+ val FRONTEND_MYSQL_MIN_WORKER_THREADS: ConfigEntry[Int] =
+ buildConf("frontend.mysql.min.worker.threads")
+ .doc("Minimum number of threads in the command execution thread pool for
the MySQL " +
+ "frontend service")
+ .version("1.4.0")
+ .fallbackConf(FRONTEND_MIN_WORKER_THREADS)
+
+ val FRONTEND_MYSQL_MAX_WORKER_THREADS: ConfigEntry[Int] =
+ buildConf("frontend.mysql.max.worker.threads")
+ .doc("Maximum number of threads in the command execution thread pool for
the MySQL " +
+ "frontend service")
+ .version("1.4.0")
+ .fallbackConf(FRONTEND_MAX_WORKER_THREADS)
+
+ val FRONTEND_MYSQL_WORKER_KEEPALIVE_TIME: ConfigEntry[Long] =
+ buildConf("frontend.mysql.worker.keepalive.time")
+ .doc("Time(ms) that an idle async thread of the command execution thread
pool will wait" +
+ " for a new task to arrive before terminating in MySQL frontend
service")
+ .version("1.4.0")
+ .fallbackConf(FRONTEND_WORKER_KEEPALIVE_TIME)
+
/////////////////////////////////////////////////////////////////////////////////////////////////
// SQL Engine Configuration
//
/////////////////////////////////////////////////////////////////////////////////////////////////
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/NettyUtils.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/NettyUtils.scala
new file mode 100644
index 0000000..c5b6e2e
--- /dev/null
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/NettyUtils.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.util
+
+import java.util.concurrent.ThreadFactory
+
+import scala.math.min
+
+import io.netty.channel._
+import io.netty.channel.epoll._
+import io.netty.channel.nio.NioEventLoopGroup
+import io.netty.channel.socket.nio.{NioServerSocketChannel, NioSocketChannel}
+import io.netty.util.concurrent.DefaultThreadFactory
+
+object NettyUtils {
+
+ /**
+ * Specifies an upper bound on the number of Netty threads that Kyuubi
requires by default.
+ * In practice, only 2-4 cores should be required to transfer roughly 10
Gb/s, and each core
+ * that we use will have an initial overhead of roughly 32 MB of off-heap
memory, which comes
+ * at a premium.
+ *
+ * Thus, this value should still retain maximum throughput and reduce wasted
off-heap memory
+ * allocation.
+ */
+ val MAX_NETTY_THREADS: Int = 8
+
+ val EPOLL_MODE: Boolean = Epoll.isAvailable
+
+ val CLIENT_CHANNEL_CLASS: Class[_ <: Channel] =
+ if (EPOLL_MODE) classOf[EpollSocketChannel] else classOf[NioSocketChannel]
+
+ val SERVER_CHANNEL_CLASS: Class[_ <: ServerChannel] =
+ if (EPOLL_MODE) classOf[EpollServerSocketChannel] else
classOf[NioServerSocketChannel]
+
+ def createThreadFactory(threadPoolPrefix: String): ThreadFactory =
+ new DefaultThreadFactory(threadPoolPrefix, true)
+
+ def createEventLoop(numThreads: Int, threadPrefix: String): EventLoopGroup =
{
+ val threadFactory = createThreadFactory(threadPrefix)
+ if (EPOLL_MODE) {
+ new EpollEventLoopGroup(numThreads, threadFactory)
+ } else {
+ new NioEventLoopGroup(numThreads, threadFactory)
+ }
+ }
+
+ /**
+ * Returns the default number of threads for the Netty thread pools. If
numUsableCores is absent,
+ * we will use Runtime get an approximate number of available cores.
+ */
+ def defaultNumThreads(numUsableCores: Option[Int]): Int = numUsableCores
match {
+ case Some(num) => min(num, MAX_NETTY_THREADS)
+ case None => min(sys.runtime.availableProcessors, MAX_NETTY_THREADS)
+ }
+}
diff --git a/kyuubi-server/pom.xml b/kyuubi-server/pom.xml
index 821bde6..ecf336b 100644
--- a/kyuubi-server/pom.xml
+++ b/kyuubi-server/pom.xml
@@ -203,11 +203,6 @@
</dependency>
<dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-all</artifactId>
- </dependency>
-
- <dependency>
<groupId>org.glassfish.jersey.core</groupId>
<artifactId>jersey-server</artifactId>
</dependency>
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiMySQLFrontendService.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiMySQLFrontendService.scala
new file mode 100644
index 0000000..5c2800d
--- /dev/null
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiMySQLFrontendService.scala
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.server
+
+import java.net.{InetAddress, InetSocketAddress}
+import java.util.concurrent.{ThreadPoolExecutor, TimeUnit}
+
+import io.netty.bootstrap.ServerBootstrap
+import io.netty.buffer.PooledByteBufAllocator
+import io.netty.channel.{ChannelFuture, ChannelInitializer, ChannelOption}
+import io.netty.channel.socket.SocketChannel
+import io.netty.handler.logging.{LoggingHandler, LogLevel}
+
+import org.apache.kyuubi.{KyuubiException, Logging, Utils}
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.KyuubiConf._
+import org.apache.kyuubi.service.{AbstractFrontendService, Serverable, Service}
+import org.apache.kyuubi.util.ExecutorPoolCaptureOom
+import org.apache.kyuubi.util.NettyUtils._
+
+/**
+ * A frontend service implement MySQL protocol.
+ */
+class KyuubiMySQLFrontendService(override val serverable: Serverable)
+ extends AbstractFrontendService("KyuubiMySQLFrontendService") with Logging {
+
+ private var execPool: ThreadPoolExecutor = _
+
+ private var serverAddr: InetAddress = _
+ private var port: Int = _
+ private var bootstrap: ServerBootstrap = _
+ private var bindFuture: ChannelFuture = _
+
+ @volatile protected var isStarted = false
+
+ protected def oomHook: Runnable = () => serverable.stop()
+
+ override def initialize(conf: KyuubiConf): Unit = synchronized {
+ val minThreads = conf.get(FRONTEND_MYSQL_MIN_WORKER_THREADS)
+ val maxThreads = conf.get(FRONTEND_MYSQL_MAX_WORKER_THREADS)
+ val keepAliveMs = conf.get(FRONTEND_MYSQL_WORKER_KEEPALIVE_TIME)
+ execPool = ExecutorPoolCaptureOom(
+ "mysql-exec-pool",
+ minThreads, maxThreads,
+ keepAliveMs,
+ oomHook)
+
+ serverAddr = conf.get(FRONTEND_MYSQL_BIND_HOST)
+ .map(InetAddress.getByName)
+ .getOrElse(Utils.findLocalInetAddress)
+ port = conf.get(FRONTEND_MYSQL_BIND_PORT)
+ val workerThreads =
defaultNumThreads(conf.get(FRONTEND_MYSQL_NETTY_WORKER_THREADS))
+ val bossGroup = createEventLoop(1, "mysql-netty-boss")
+ val workerGroup = createEventLoop(workerThreads, "mysql-netty-worker")
+ bootstrap = new ServerBootstrap()
+ .group(bossGroup, workerGroup)
+ .channel(SERVER_CHANNEL_CLASS)
+ .option(ChannelOption.SO_BACKLOG, Int.box(128))
+ .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
+ .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
+ .childOption(ChannelOption.TCP_NODELAY, Boolean.box(true))
+ .childHandler(new ChannelInitializer[SocketChannel] {
+ override def initChannel(channel: SocketChannel): Unit =
channel.pipeline
+ .addLast(new LoggingHandler("org.apache.kyuubi.server.mysql.codec",
LogLevel.TRACE))
+ // TODO implement authentication, codec, command handler
+ })
+ super.initialize(conf)
+ }
+
+ override def connectionUrl: String = {
+ checkInitialized()
+ s"${serverAddr.getCanonicalHostName}:$port"
+ }
+
+ override def start(): Unit = synchronized {
+ if (!isStarted) {
+ try {
+ bindFuture = bootstrap.bind(serverAddr, port)
+ bindFuture.syncUninterruptibly
+ port =
bindFuture.channel.localAddress.asInstanceOf[InetSocketAddress].getPort
+ isStarted = true
+ info(s"MySQL frontend service has started at $connectionUrl.")
+ } catch {
+ case rethrow: Exception =>
+ throw new KyuubiException("Cannot start MySQL frontend service Netty
server", rethrow)
+ }
+ }
+ super.start()
+ }
+
+ override def stop(): Unit = synchronized {
+ if (isStarted) {
+ if (bindFuture != null) {
+ // close is a local operation and should finish within milliseconds;
timeout just to be safe
+ bindFuture.channel.close.awaitUninterruptibly(10, TimeUnit.SECONDS)
+ bindFuture = null
+ }
+ if (bootstrap != null && bootstrap.config.group != null) {
+ bootstrap.config.group.shutdownGracefully
+ }
+ if (bootstrap != null && bootstrap.config.childGroup != null) {
+ bootstrap.config.childGroup.shutdownGracefully
+ }
+ bootstrap = null
+ isStarted = false
+ }
+ super.stop()
+ }
+
+ override val discoveryService: Option[Service] = None
+}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala
index 8b87044..f620a33 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala
@@ -34,12 +34,12 @@ import org.apache.kyuubi.service.{AbstractFrontendService,
Serverable, Service}
* Note: Currently, it only be used in the Kyuubi Server side.
*/
class KyuubiRestFrontendService(override val serverable: Serverable)
- extends AbstractFrontendService("RestFrontendService") with Logging {
+ extends AbstractFrontendService("KyuubiRestFrontendService") with Logging {
- var serverAddr: InetAddress = _
- var portNum: Int = _
- var jettyServer: Server = _
- var connector: ServerConnector = _
+ private var serverAddr: InetAddress = _
+ private var portNum: Int = _
+ private var jettyServer: Server = _
+ private var connector: ServerConnector = _
@volatile protected var isStarted = false
@@ -84,13 +84,13 @@ class KyuubiRestFrontendService(override val serverable:
Serverable)
try {
connector.start()
jettyServer.start()
+ isStarted = true
info(s"Rest frontend service jetty server has started at
${jettyServer.getURI}.")
} catch {
case rethrow: Exception =>
stopHttpServer()
throw new KyuubiException("Cannot start rest frontend service jetty
server", rethrow)
}
- isStarted = true
}
super.start()
diff --git a/kyuubi-server/src/test/resources/log4j.properties
b/kyuubi-server/src/test/resources/log4j.properties
index 958c9c8..06881c9 100644
--- a/kyuubi-server/src/test/resources/log4j.properties
+++ b/kyuubi-server/src/test/resources/log4j.properties
@@ -18,6 +18,9 @@
# Set everything to be logged to the file target/unit-tests.log
log4j.rootLogger=INFO, CA, FA
+# Analysis MySQLFrontend protocol traffic
+# log4j.logger.org.apache.kyuubi.server.mysql.codec=TRACE
+
#Console Appender
log4j.appender.CA=org.apache.log4j.ConsoleAppender
log4j.appender.CA.layout=org.apache.log4j.PatternLayout
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/RestFrontendTestHelper.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/RestFrontendTestHelper.scala
index d222bb8..8c87752 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/RestFrontendTestHelper.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/RestFrontendTestHelper.scala
@@ -57,7 +57,7 @@ trait RestFrontendTestHelper {
val server = new NoopRestFrontendServer()
server.stop()
val conf = KyuubiConf()
- conf.set(KyuubiConf.FRONTEND_REST_BIND_HOST, restFrontendHost)
+ conf.set(KyuubiConf.FRONTEND_REST_BIND_HOST, Some(restFrontendHost))
server.initialize(conf)
server.start()
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/KyuubiMySQLFrontendServiceSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/KyuubiMySQLFrontendServiceSuite.scala
new file mode 100644
index 0000000..735863b
--- /dev/null
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/KyuubiMySQLFrontendServiceSuite.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.server
+
+import org.apache.kyuubi.KyuubiFunSuite
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.service.NoopMySQLFrontendServer
+import org.apache.kyuubi.service.ServiceState._
+
+class KyuubiMySQLFrontendServiceSuite extends KyuubiFunSuite {
+
+ test("Kyuubi MySQL frontend service basic") {
+ val server = new NoopMySQLFrontendServer
+ server.stop()
+ val conf = KyuubiConf()
+ assert(server.getServices.isEmpty)
+ assert(server.getServiceState === LATENT)
+ val e =
intercept[IllegalStateException](server.frontendServices.head.connectionUrl)
+ assert(e.getMessage startsWith "Illegal Service State: LATENT")
+ assert(server.getConf === null)
+
+ server.initialize(conf)
+ assert(server.getServiceState === INITIALIZED)
+ val frontendService = server.frontendServices.head
+ assert(frontendService.getServiceState == INITIALIZED)
+ assert(server.frontendServices.head.connectionUrl.split(":").length === 2)
+ assert(server.getConf === conf)
+ assert(server.getStartTime === 0)
+ server.stop()
+
+ server.start()
+ assert(server.getServiceState === STARTED)
+ assert(frontendService.getServiceState == STARTED)
+ assert(server.getStartTime !== 0)
+
+ server.stop()
+ assert(server.getServiceState === STOPPED)
+ assert(frontendService.getServiceState == STOPPED)
+ server.stop()
+ }
+}
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/service/NoopMySQLFrontendServer.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/service/NoopMySQLFrontendServer.scala
new file mode 100644
index 0000000..6e6fec3
--- /dev/null
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/service/NoopMySQLFrontendServer.scala
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.service
+
+import org.apache.kyuubi.server.KyuubiMySQLFrontendService
+
+class NoopMySQLFrontendServer extends
AbstractNoopServer("NoopMySQLFrontendServer") {
+
+ override val frontendServices = Seq(new KyuubiMySQLFrontendService(this))
+}