This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 1061d17  [KYUUBI #1358] Add KyuubiMySQLFrontendService stub
1061d17 is described below

commit 1061d176c832967c57be60fb4c908191ddc2dfd9
Author: Cheng Pan <[email protected]>
AuthorDate: Thu Nov 11 16:50:52 2021 +0800

    [KYUUBI #1358] Add KyuubiMySQLFrontendService stub
    
    ### _Why are the changes needed?_
    
    Add KyuubiMySQLFrontendService stub, without Netty pipeline, part of #1334
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run 
test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests)
 locally before make a pull request
    
    Closes #1358 from pan3793/mysql-fe-stub.
    
    Closes #1358
    
    c091d955 [Cheng Pan] nit
    61abb0f6 [Cheng Pan] Address comments
    9ac70456 [Cheng Pan] Update conf
    3d75acdf [Cheng Pan] nit
    4a4d8a24 [Cheng Pan] Address comments
    0136dd52 [Cheng Pan] nit
    477474ff [Cheng Pan] KyuubiMySQLFrontendService Stub
    
    Authored-by: Cheng Pan <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
---
 conf/log4j.properties.template                     |   3 +
 docs/deployment/settings.md                        |   9 ++
 kyuubi-common/pom.xml                              |   5 +
 .../org/apache/kyuubi/config/KyuubiConf.scala      |  69 ++++++++---
 .../scala/org/apache/kyuubi/util/NettyUtils.scala  |  71 ++++++++++++
 kyuubi-server/pom.xml                              |   5 -
 .../kyuubi/server/KyuubiMySQLFrontendService.scala | 126 +++++++++++++++++++++
 .../kyuubi/server/KyuubiRestFrontendService.scala  |  12 +-
 kyuubi-server/src/test/resources/log4j.properties  |   3 +
 .../org/apache/kyuubi/RestFrontendTestHelper.scala |   2 +-
 .../server/KyuubiMySQLFrontendServiceSuite.scala   |  56 +++++++++
 .../kyuubi/service/NoopMySQLFrontendServer.scala   |  25 ++++
 12 files changed, 361 insertions(+), 25 deletions(-)

diff --git a/conf/log4j.properties.template b/conf/log4j.properties.template
index 4543adb..82cf9b1 100644
--- a/conf/log4j.properties.template
+++ b/conf/log4j.properties.template
@@ -25,3 +25,6 @@ log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd 
HH:mm:ss.SSS} %p %
 # Set the default kyuubi-ctl log level to WARN. When running the kyuubi-ctl, 
the
 # log level for this class is used to overwrite the root logger's log level.
 log4j.logger.org.apache.kyuubi.ctl.ServiceControlCli=ERROR
+
+# Analysis MySQLFrontend protocol traffic
+# log4j.logger.org.apache.kyuubi.server.mysql.codec=TRACE
diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md
index 8ac415f..1e8fe97 100644
--- a/docs/deployment/settings.md
+++ b/docs/deployment/settings.md
@@ -205,6 +205,12 @@ kyuubi\.frontend\.login<br>\.timeout|<div style='width: 
65pt;word-wrap: break-wo
 kyuubi\.frontend\.max<br>\.message\.size|<div style='width: 65pt;word-wrap: 
break-word;white-space: normal'>104857600</div>|<div style='width: 
170pt;word-wrap: break-word;white-space: normal'>(deprecated) Maximum message 
size in bytes a Kyuubi server will accept.</div>|<div style='width: 
30pt'>int</div>|<div style='width: 20pt'>1.0.0</div>
 kyuubi\.frontend\.max<br>\.worker\.threads|<div style='width: 65pt;word-wrap: 
break-word;white-space: normal'>999</div>|<div style='width: 170pt;word-wrap: 
break-word;white-space: normal'>(deprecated) Maximum number of threads in the 
of frontend worker thread pool for the thrift frontend service</div>|<div 
style='width: 30pt'>int</div>|<div style='width: 20pt'>1.0.0</div>
 kyuubi\.frontend\.min<br>\.worker\.threads|<div style='width: 65pt;word-wrap: 
break-word;white-space: normal'>9</div>|<div style='width: 170pt;word-wrap: 
break-word;white-space: normal'>(deprecated) Minimum number of threads in the 
of frontend worker thread pool for the thrift frontend service</div>|<div 
style='width: 30pt'>int</div>|<div style='width: 20pt'>1.0.0</div>
+kyuubi\.frontend\.mysql<br>\.bind\.host|<div style='width: 65pt;word-wrap: 
break-word;white-space: normal'>&lt;undefined&gt;</div>|<div style='width: 
170pt;word-wrap: break-word;white-space: normal'>Hostname or IP of the machine 
on which to run the MySQL frontend service.</div>|<div style='width: 
30pt'>string</div>|<div style='width: 20pt'>1.4.0</div>
+kyuubi\.frontend\.mysql<br>\.bind\.port|<div style='width: 65pt;word-wrap: 
break-word;white-space: normal'>3309</div>|<div style='width: 170pt;word-wrap: 
break-word;white-space: normal'>Port of the machine on which to run the MySQL 
frontend service.</div>|<div style='width: 30pt'>int</div>|<div style='width: 
20pt'>1.4.0</div>
+kyuubi\.frontend\.mysql<br>\.max\.worker\.threads|<div style='width: 
65pt;word-wrap: break-word;white-space: normal'>999</div>|<div style='width: 
170pt;word-wrap: break-word;white-space: normal'>Maximum number of threads in 
the command execution thread pool for the MySQL frontend service</div>|<div 
style='width: 30pt'>int</div>|<div style='width: 20pt'>1.4.0</div>
+kyuubi\.frontend\.mysql<br>\.min\.worker\.threads|<div style='width: 
65pt;word-wrap: break-word;white-space: normal'>9</div>|<div style='width: 
170pt;word-wrap: break-word;white-space: normal'>Minimum number of threads in 
the command execution thread pool for the MySQL frontend service</div>|<div 
style='width: 30pt'>int</div>|<div style='width: 20pt'>1.4.0</div>
+kyuubi\.frontend\.mysql<br>\.netty\.worker\.threads|<div style='width: 
65pt;word-wrap: break-word;white-space: normal'>&lt;undefined&gt;</div>|<div 
style='width: 170pt;word-wrap: break-word;white-space: normal'>Number of thread 
in the netty worker event loop of MySQL frontend service. Use min(cpu_cores, 8) 
in default.</div>|<div style='width: 30pt'>int</div>|<div style='width: 
20pt'>1.4.0</div>
+kyuubi\.frontend\.mysql<br>\.worker\.keepalive\.time|<div style='width: 
65pt;word-wrap: break-word;white-space: normal'>PT1M</div>|<div style='width: 
170pt;word-wrap: break-word;white-space: normal'>Time(ms) that an idle async 
thread of the command execution thread pool will wait for a new task to arrive 
before terminating in MySQL frontend service</div>|<div style='width: 
30pt'>duration</div>|<div style='width: 20pt'>1.4.0</div>
 kyuubi\.frontend<br>\.protocols|<div style='width: 65pt;word-wrap: 
break-word;white-space: normal'>THRIFT_BINARY</div>|<div style='width: 
170pt;word-wrap: break-word;white-space: normal'>A comma separated list for all 
frontend protocols <ul> <li>THRIFT_BINARY - HiveServer2 compatible thrift 
binary protocol.</li> <li>REST - Kyuubi defined REST API(experimental).</li> 
</ul></div>|<div style='width: 30pt'>seq</div>|<div style='width: 
20pt'>1.4.0</div>
 kyuubi\.frontend\.rest<br>\.bind\.host|<div style='width: 65pt;word-wrap: 
break-word;white-space: normal'>&lt;undefined&gt;</div>|<div style='width: 
170pt;word-wrap: break-word;white-space: normal'>Hostname or IP of the machine 
on which to run the REST frontend service.</div>|<div style='width: 
30pt'>string</div>|<div style='width: 20pt'>1.4.0</div>
 kyuubi\.frontend\.rest<br>\.bind\.port|<div style='width: 65pt;word-wrap: 
break-word;white-space: normal'>10099</div>|<div style='width: 170pt;word-wrap: 
break-word;white-space: normal'>Port of the machine on which to run the REST 
frontend service.</div>|<div style='width: 30pt'>int</div>|<div style='width: 
20pt'>1.4.0</div>
@@ -373,6 +379,9 @@ 
log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss.SSS} %p %
 # Set the default kyuubi-ctl log level to WARN. When running the kyuubi-ctl, 
the
 # log level for this class is used to overwrite the root logger's log level.
 log4j.logger.org.apache.kyuubi.ctl.ServiceControlCli=ERROR
+
+# Analysis MySQLFrontend protocol traffic
+# log4j.logger.org.apache.kyuubi.server.mysql.codec=TRACE
 ```
 
 ## Other Configurations
diff --git a/kyuubi-common/pom.xml b/kyuubi-common/pom.xml
index 02988bd..90923f9 100644
--- a/kyuubi-common/pom.xml
+++ b/kyuubi-common/pom.xml
@@ -99,6 +99,11 @@
         </dependency>
 
         <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-all</artifactId>
+        </dependency>
+
+        <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-minikdc</artifactId>
             <scope>test</scope>
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index f738642..0a3eac3 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -27,6 +27,7 @@ import scala.collection.JavaConverters._
 import org.apache.kyuubi.{Logging, Utils}
 import org.apache.kyuubi.engine.{EngineType, ShareLevel}
 import org.apache.kyuubi.service.authentication.{AuthTypes, SaslQOP}
+import org.apache.kyuubi.util.NettyUtils.MAX_NETTY_THREADS
 
 case class KyuubiConf(loadSysDefault: Boolean = true) extends Logging {
   import KyuubiConf._
@@ -136,6 +137,8 @@ case class KyuubiConf(loadSysDefault: Boolean = true) 
extends Logging {
     FRONTEND_THRIFT_BINARY_BIND_PORT,
     FRONTEND_REST_BIND_HOST,
     FRONTEND_REST_BIND_PORT,
+    FRONTEND_MYSQL_BIND_HOST,
+    FRONTEND_MYSQL_BIND_PORT,
     AUTHENTICATION_METHOD,
     KINIT_INTERVAL)
 
@@ -274,7 +277,6 @@ object KyuubiConf {
         s"the frontend protocol should be one or more of 
${FrontendProtocols.values.mkString(",")}")
       .createWithDefault(Seq(FrontendProtocols.THRIFT_BINARY.toString))
 
-  @deprecated(s"using ${FRONTEND_THRIFT_BINARY_BIND_HOST.key} instead", 
"1.4.0")
   val FRONTEND_BIND_HOST: OptionalConfigEntry[String] = 
buildConf("frontend.bind.host")
     .doc("(deprecated) Hostname or IP of the machine on which to run the 
thrift frontend service " +
       "via binary protocol.")
@@ -300,11 +302,10 @@ object KyuubiConf {
 
   val FRONTEND_THRIFT_BINARY_BIND_PORT: ConfigEntry[Int] =
     buildConf("frontend.thrift.binary.bind.port")
-    .doc("Port of the machine on which to run the thrift frontend service via 
binary protocol.")
-    .version("1.4.0")
-    .fallbackConf(FRONTEND_BIND_PORT)
+      .doc("Port of the machine on which to run the thrift frontend service 
via binary protocol.")
+      .version("1.4.0")
+      .fallbackConf(FRONTEND_BIND_PORT)
 
-  @deprecated(s"using ${FRONTEND_THRIFT_MIN_WORKER_THREADS.key} instead", 
"1.4.0")
   val FRONTEND_MIN_WORKER_THREADS: ConfigEntry[Int] = 
buildConf("frontend.min.worker.threads")
     .doc("(deprecated) Minimum number of threads in the of frontend worker 
thread pool for " +
       "the thrift frontend service")
@@ -319,7 +320,6 @@ object KyuubiConf {
     .version("1.4.0")
     .fallbackConf(FRONTEND_MIN_WORKER_THREADS)
 
-  @deprecated(s"using ${FRONTEND_THRIFT_MAX_WORKER_THREADS.key} instead", 
"1.4.0")
   val FRONTEND_MAX_WORKER_THREADS: ConfigEntry[Int] = 
buildConf("frontend.max.worker.threads")
     .doc("(deprecated) Maximum number of threads in the of frontend worker 
thread pool for " +
       "the thrift frontend service")
@@ -334,7 +334,6 @@ object KyuubiConf {
     .version("1.4.0")
     .fallbackConf(FRONTEND_MAX_WORKER_THREADS)
 
-  @deprecated(s"using ${FRONTEND_THRIFT_WORKER_KEEPALIVE_TIME.key} instead", 
"1.4.0")
   val FRONTEND_WORKER_KEEPALIVE_TIME: ConfigEntry[Long] =
     buildConf("frontend.worker.keepalive.time")
       .doc("(deprecated) Keep-alive time (in milliseconds) for an idle worker 
thread")
@@ -348,7 +347,7 @@ object KyuubiConf {
       .version("1.4.0")
       .fallbackConf(FRONTEND_WORKER_KEEPALIVE_TIME)
 
-  @deprecated(s"using ${FRONTEND_THRIFT_WORKER_KEEPALIVE_TIME.key} instead", 
"1.4.0")
+  @deprecated(s"using ${FRONTEND_THRIFT_MAX_MESSAGE_SIZE.key} instead", 
"1.4.0")
   val FRONTEND_MAX_MESSAGE_SIZE: ConfigEntry[Int] =
     buildConf("frontend.max.message.size")
       .doc("(deprecated) Maximum message size in bytes a Kyuubi server will 
accept.")
@@ -486,11 +485,11 @@ object KyuubiConf {
     .transform(_.toLowerCase(Locale.ROOT))
     .createWithDefault(SaslQOP.AUTH.toString)
 
-  val FRONTEND_REST_BIND_HOST: OptionalConfigEntry[String] = 
buildConf("frontend.rest.bind.host")
-    .doc("Hostname or IP of the machine on which to run the REST frontend 
service.")
-    .version("1.4.0")
-    .stringConf
-    .createOptional
+  val FRONTEND_REST_BIND_HOST: ConfigEntry[Option[String]] =
+    buildConf("frontend.rest.bind.host")
+      .doc("Hostname or IP of the machine on which to run the REST frontend 
service.")
+      .version("1.4.0")
+      .fallbackConf(FRONTEND_BIND_HOST)
 
   val FRONTEND_REST_BIND_PORT: ConfigEntry[Int] = 
buildConf("frontend.rest.bind.port")
     .doc("Port of the machine on which to run the REST frontend service.")
@@ -499,6 +498,50 @@ object KyuubiConf {
     .checkValue(p => p == 0 || (p > 1024 && p < 65535), "Invalid Port number")
     .createWithDefault(10099)
 
+  val FRONTEND_MYSQL_BIND_HOST: ConfigEntry[Option[String]] =
+    buildConf("frontend.mysql.bind.host")
+      .doc("Hostname or IP of the machine on which to run the MySQL frontend 
service.")
+      .version("1.4.0")
+      .fallbackConf(FRONTEND_BIND_HOST)
+
+  val FRONTEND_MYSQL_BIND_PORT: ConfigEntry[Int] = 
buildConf("frontend.mysql.bind.port")
+    .doc("Port of the machine on which to run the MySQL frontend service.")
+    .version("1.4.0")
+    .intConf
+    .checkValue(p => p == 0 || (p > 1024 && p < 65535), "Invalid Port number")
+    .createWithDefault(3309)
+
+  val FRONTEND_MYSQL_NETTY_WORKER_THREADS: OptionalConfigEntry[Int] =
+    buildConf("frontend.mysql.netty.worker.threads")
+      .doc("Number of thread in the netty worker event loop of MySQL frontend 
service. " +
+        s"Use min(cpu_cores, $MAX_NETTY_THREADS) in default.")
+      .version("1.4.0")
+      .intConf
+      .checkValue(n => n > 0 && n <= MAX_NETTY_THREADS,
+        s"Invalid thread number, must in (0, $MAX_NETTY_THREADS]")
+      .createOptional
+
+  val FRONTEND_MYSQL_MIN_WORKER_THREADS: ConfigEntry[Int] =
+    buildConf("frontend.mysql.min.worker.threads")
+      .doc("Minimum number of threads in the command execution thread pool for 
the MySQL " +
+        "frontend service")
+      .version("1.4.0")
+      .fallbackConf(FRONTEND_MIN_WORKER_THREADS)
+
+  val FRONTEND_MYSQL_MAX_WORKER_THREADS: ConfigEntry[Int] =
+    buildConf("frontend.mysql.max.worker.threads")
+      .doc("Maximum number of threads in the command execution thread pool for 
the MySQL " +
+        "frontend service")
+      .version("1.4.0")
+      .fallbackConf(FRONTEND_MAX_WORKER_THREADS)
+
+  val FRONTEND_MYSQL_WORKER_KEEPALIVE_TIME: ConfigEntry[Long] =
+    buildConf("frontend.mysql.worker.keepalive.time")
+      .doc("Time(ms) that an idle async thread of the command execution thread 
pool will wait" +
+        " for a new task to arrive before terminating in MySQL frontend 
service")
+      .version("1.4.0")
+      .fallbackConf(FRONTEND_WORKER_KEEPALIVE_TIME)
+
   
/////////////////////////////////////////////////////////////////////////////////////////////////
   //                                 SQL Engine Configuration                  
                  //
   
/////////////////////////////////////////////////////////////////////////////////////////////////
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/NettyUtils.scala 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/NettyUtils.scala
new file mode 100644
index 0000000..c5b6e2e
--- /dev/null
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/NettyUtils.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.util
+
+import java.util.concurrent.ThreadFactory
+
+import scala.math.min
+
+import io.netty.channel._
+import io.netty.channel.epoll._
+import io.netty.channel.nio.NioEventLoopGroup
+import io.netty.channel.socket.nio.{NioServerSocketChannel, NioSocketChannel}
+import io.netty.util.concurrent.DefaultThreadFactory
+
+object NettyUtils {
+
+  /**
+   * Specifies an upper bound on the number of Netty threads that Kyuubi 
requires by default.
+   * In practice, only 2-4 cores should be required to transfer roughly 10 
Gb/s, and each core
+   * that we use will have an initial overhead of roughly 32 MB of off-heap 
memory, which comes
+   * at a premium.
+   *
+   * Thus, this value should still retain maximum throughput and reduce wasted 
off-heap memory
+   * allocation.
+   */
+  val MAX_NETTY_THREADS: Int = 8
+
+  val EPOLL_MODE: Boolean = Epoll.isAvailable
+
+  val CLIENT_CHANNEL_CLASS: Class[_ <: Channel] =
+    if (EPOLL_MODE) classOf[EpollSocketChannel] else classOf[NioSocketChannel]
+
+  val SERVER_CHANNEL_CLASS: Class[_ <: ServerChannel] =
+    if (EPOLL_MODE) classOf[EpollServerSocketChannel] else 
classOf[NioServerSocketChannel]
+
+  def createThreadFactory(threadPoolPrefix: String): ThreadFactory =
+    new DefaultThreadFactory(threadPoolPrefix, true)
+
+  def createEventLoop(numThreads: Int, threadPrefix: String): EventLoopGroup = 
{
+    val threadFactory = createThreadFactory(threadPrefix)
+    if (EPOLL_MODE) {
+      new EpollEventLoopGroup(numThreads, threadFactory)
+    } else {
+      new NioEventLoopGroup(numThreads, threadFactory)
+    }
+  }
+
+  /**
+   * Returns the default number of threads for the Netty thread pools. If 
numUsableCores is absent,
+   * we will use Runtime get an approximate number of available cores.
+   */
+  def defaultNumThreads(numUsableCores: Option[Int]): Int = numUsableCores 
match {
+    case Some(num) => min(num, MAX_NETTY_THREADS)
+    case None => min(sys.runtime.availableProcessors, MAX_NETTY_THREADS)
+  }
+}
diff --git a/kyuubi-server/pom.xml b/kyuubi-server/pom.xml
index 821bde6..ecf336b 100644
--- a/kyuubi-server/pom.xml
+++ b/kyuubi-server/pom.xml
@@ -203,11 +203,6 @@
         </dependency>
 
         <dependency>
-            <groupId>io.netty</groupId>
-            <artifactId>netty-all</artifactId>
-        </dependency>
-
-        <dependency>
             <groupId>org.glassfish.jersey.core</groupId>
             <artifactId>jersey-server</artifactId>
         </dependency>
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiMySQLFrontendService.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiMySQLFrontendService.scala
new file mode 100644
index 0000000..5c2800d
--- /dev/null
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiMySQLFrontendService.scala
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.server
+
+import java.net.{InetAddress, InetSocketAddress}
+import java.util.concurrent.{ThreadPoolExecutor, TimeUnit}
+
+import io.netty.bootstrap.ServerBootstrap
+import io.netty.buffer.PooledByteBufAllocator
+import io.netty.channel.{ChannelFuture, ChannelInitializer, ChannelOption}
+import io.netty.channel.socket.SocketChannel
+import io.netty.handler.logging.{LoggingHandler, LogLevel}
+
+import org.apache.kyuubi.{KyuubiException, Logging, Utils}
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.KyuubiConf._
+import org.apache.kyuubi.service.{AbstractFrontendService, Serverable, Service}
+import org.apache.kyuubi.util.ExecutorPoolCaptureOom
+import org.apache.kyuubi.util.NettyUtils._
+
+/**
+ * A frontend service implement MySQL protocol.
+ */
+class KyuubiMySQLFrontendService(override val serverable: Serverable)
+  extends AbstractFrontendService("KyuubiMySQLFrontendService") with Logging {
+
+  private var execPool: ThreadPoolExecutor = _
+
+  private var serverAddr: InetAddress = _
+  private var port: Int = _
+  private var bootstrap: ServerBootstrap = _
+  private var bindFuture: ChannelFuture = _
+
+  @volatile protected var isStarted = false
+
+  protected def oomHook: Runnable = () => serverable.stop()
+
+  override def initialize(conf: KyuubiConf): Unit = synchronized {
+    val minThreads = conf.get(FRONTEND_MYSQL_MIN_WORKER_THREADS)
+    val maxThreads = conf.get(FRONTEND_MYSQL_MAX_WORKER_THREADS)
+    val keepAliveMs = conf.get(FRONTEND_MYSQL_WORKER_KEEPALIVE_TIME)
+    execPool = ExecutorPoolCaptureOom(
+      "mysql-exec-pool",
+      minThreads, maxThreads,
+      keepAliveMs,
+      oomHook)
+
+    serverAddr = conf.get(FRONTEND_MYSQL_BIND_HOST)
+      .map(InetAddress.getByName)
+      .getOrElse(Utils.findLocalInetAddress)
+    port = conf.get(FRONTEND_MYSQL_BIND_PORT)
+    val workerThreads = 
defaultNumThreads(conf.get(FRONTEND_MYSQL_NETTY_WORKER_THREADS))
+    val bossGroup = createEventLoop(1, "mysql-netty-boss")
+    val workerGroup = createEventLoop(workerThreads, "mysql-netty-worker")
+    bootstrap = new ServerBootstrap()
+      .group(bossGroup, workerGroup)
+      .channel(SERVER_CHANNEL_CLASS)
+      .option(ChannelOption.SO_BACKLOG, Int.box(128))
+      .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
+      .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
+      .childOption(ChannelOption.TCP_NODELAY, Boolean.box(true))
+      .childHandler(new ChannelInitializer[SocketChannel] {
+        override def initChannel(channel: SocketChannel): Unit = 
channel.pipeline
+          .addLast(new LoggingHandler("org.apache.kyuubi.server.mysql.codec", 
LogLevel.TRACE))
+        // TODO implement authentication, codec, command handler
+      })
+    super.initialize(conf)
+  }
+
+  override def connectionUrl: String = {
+    checkInitialized()
+    s"${serverAddr.getCanonicalHostName}:$port"
+  }
+
+  override def start(): Unit = synchronized {
+    if (!isStarted) {
+      try {
+        bindFuture = bootstrap.bind(serverAddr, port)
+        bindFuture.syncUninterruptibly
+        port = 
bindFuture.channel.localAddress.asInstanceOf[InetSocketAddress].getPort
+        isStarted = true
+        info(s"MySQL frontend service has started at $connectionUrl.")
+      } catch {
+        case rethrow: Exception =>
+          throw new KyuubiException("Cannot start MySQL frontend service Netty 
server", rethrow)
+      }
+    }
+    super.start()
+  }
+
+  override def stop(): Unit = synchronized {
+    if (isStarted) {
+      if (bindFuture != null) {
+        // close is a local operation and should finish within milliseconds; 
timeout just to be safe
+        bindFuture.channel.close.awaitUninterruptibly(10, TimeUnit.SECONDS)
+        bindFuture = null
+      }
+      if (bootstrap != null && bootstrap.config.group != null) {
+        bootstrap.config.group.shutdownGracefully
+      }
+      if (bootstrap != null && bootstrap.config.childGroup != null) {
+        bootstrap.config.childGroup.shutdownGracefully
+      }
+      bootstrap = null
+      isStarted = false
+    }
+    super.stop()
+  }
+
+  override val discoveryService: Option[Service] = None
+}
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala
index 8b87044..f620a33 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala
@@ -34,12 +34,12 @@ import org.apache.kyuubi.service.{AbstractFrontendService, 
Serverable, Service}
  * Note: Currently, it only be used in the Kyuubi Server side.
  */
 class KyuubiRestFrontendService(override val serverable: Serverable)
-  extends AbstractFrontendService("RestFrontendService") with Logging {
+  extends AbstractFrontendService("KyuubiRestFrontendService") with Logging {
 
-  var serverAddr: InetAddress = _
-  var portNum: Int = _
-  var jettyServer: Server = _
-  var connector: ServerConnector = _
+  private var serverAddr: InetAddress = _
+  private var portNum: Int = _
+  private var jettyServer: Server = _
+  private var connector: ServerConnector = _
 
   @volatile protected var isStarted = false
 
@@ -84,13 +84,13 @@ class KyuubiRestFrontendService(override val serverable: 
Serverable)
       try {
         connector.start()
         jettyServer.start()
+        isStarted = true
         info(s"Rest frontend service jetty server has started at 
${jettyServer.getURI}.")
       } catch {
         case rethrow: Exception =>
           stopHttpServer()
           throw new KyuubiException("Cannot start rest frontend service jetty 
server", rethrow)
       }
-      isStarted = true
     }
 
     super.start()
diff --git a/kyuubi-server/src/test/resources/log4j.properties 
b/kyuubi-server/src/test/resources/log4j.properties
index 958c9c8..06881c9 100644
--- a/kyuubi-server/src/test/resources/log4j.properties
+++ b/kyuubi-server/src/test/resources/log4j.properties
@@ -18,6 +18,9 @@
 # Set everything to be logged to the file target/unit-tests.log
 log4j.rootLogger=INFO, CA, FA
 
+# Analysis MySQLFrontend protocol traffic
+# log4j.logger.org.apache.kyuubi.server.mysql.codec=TRACE
+
 #Console Appender
 log4j.appender.CA=org.apache.log4j.ConsoleAppender
 log4j.appender.CA.layout=org.apache.log4j.PatternLayout
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/RestFrontendTestHelper.scala 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/RestFrontendTestHelper.scala
index d222bb8..8c87752 100644
--- 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/RestFrontendTestHelper.scala
+++ 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/RestFrontendTestHelper.scala
@@ -57,7 +57,7 @@ trait RestFrontendTestHelper {
     val server = new NoopRestFrontendServer()
     server.stop()
     val conf = KyuubiConf()
-    conf.set(KyuubiConf.FRONTEND_REST_BIND_HOST, restFrontendHost)
+    conf.set(KyuubiConf.FRONTEND_REST_BIND_HOST, Some(restFrontendHost))
 
     server.initialize(conf)
     server.start()
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/KyuubiMySQLFrontendServiceSuite.scala
 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/KyuubiMySQLFrontendServiceSuite.scala
new file mode 100644
index 0000000..735863b
--- /dev/null
+++ 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/KyuubiMySQLFrontendServiceSuite.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.server
+
+import org.apache.kyuubi.KyuubiFunSuite
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.service.NoopMySQLFrontendServer
+import org.apache.kyuubi.service.ServiceState._
+
+class KyuubiMySQLFrontendServiceSuite extends KyuubiFunSuite {
+
+  test("Kyuubi MySQL frontend service basic") {
+    val server = new NoopMySQLFrontendServer
+    server.stop()
+    val conf = KyuubiConf()
+    assert(server.getServices.isEmpty)
+    assert(server.getServiceState === LATENT)
+    val e = 
intercept[IllegalStateException](server.frontendServices.head.connectionUrl)
+    assert(e.getMessage startsWith "Illegal Service State: LATENT")
+    assert(server.getConf === null)
+
+    server.initialize(conf)
+    assert(server.getServiceState === INITIALIZED)
+    val frontendService = server.frontendServices.head
+    assert(frontendService.getServiceState == INITIALIZED)
+    assert(server.frontendServices.head.connectionUrl.split(":").length === 2)
+    assert(server.getConf === conf)
+    assert(server.getStartTime === 0)
+    server.stop()
+
+    server.start()
+    assert(server.getServiceState === STARTED)
+    assert(frontendService.getServiceState == STARTED)
+    assert(server.getStartTime !== 0)
+
+    server.stop()
+    assert(server.getServiceState === STOPPED)
+    assert(frontendService.getServiceState == STOPPED)
+    server.stop()
+  }
+}
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/service/NoopMySQLFrontendServer.scala
 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/service/NoopMySQLFrontendServer.scala
new file mode 100644
index 0000000..6e6fec3
--- /dev/null
+++ 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/service/NoopMySQLFrontendServer.scala
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.service
+
+import org.apache.kyuubi.server.KyuubiMySQLFrontendService
+
+class NoopMySQLFrontendServer extends 
AbstractNoopServer("NoopMySQLFrontendServer") {
+
+  override val frontendServices = Seq(new KyuubiMySQLFrontendService(this))
+}

Reply via email to