This is an automated email from the ASF dual-hosted git repository.
yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 116369a [KYUUBI #1376] Implement MySQL Frontend based on Netty
116369a is described below
commit 116369ac8450421d6772dea68153dfb07e329118
Author: Cheng Pan <[email protected]>
AuthorDate: Mon Nov 22 20:35:18 2021 +0800
[KYUUBI #1376] Implement MySQL Frontend based on Netty
### _Why are the changes needed?_
Close #1219.
Implment MySQL text protocol, support basic query use mysql cli to run
Spark SQL.
Test:
Add `MySQLSparkQuerySuite`.
Change `OutputSchemaTPCDSSuite` from **Thrift binary protocol** to **MySQL
protocol**.
Limitations:
1) only support mysql_native_password authentication mechanism
2) only support MySQL text protocol
3) not support send back result which row larger than 16M
4) not support server-side prepared statement
5) not support overwrite engine_conf/session_conf via JDBC url parameters
like Hive JDBC client
6) limited support MySQL metadata queries, thus some MySQL GUI client which
send metadata queries implicitly may not work as expected
7) not support SSL
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run
test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #1376 from pan3793/mysql-fe-impl.
Closes #1376
96d3efb9 [Cheng Pan] Kyuubi MySQL FE
Authored-by: Cheng Pan <[email protected]>
Signed-off-by: Kent Yao <[email protected]>
---
docs/deployment/settings.md | 2 +-
.../org/apache/kyuubi/config/KyuubiConf.scala | 3 +-
.../kyuubi/operation/HiveJDBCTestHelper.scala | 88 ++----
.../apache/kyuubi/operation/JDBCTestHelper.scala | 100 +++++++
.../kyuubi/server/KyuubiMySQLFrontendService.scala | 10 +-
.../org/apache/kyuubi/server/KyuubiServer.scala | 3 +
.../apache/kyuubi/server/mysql/MySQLCodec.scala | 101 +++++++
.../kyuubi/server/mysql/MySQLCommandHandler.scala | 196 ++++++++++++++
.../kyuubi/server/mysql/MySQLDialectHelper.scala | 89 ++++++
.../apache/kyuubi/server/mysql/MySQLField.scala | 22 ++
.../kyuubi/server/mysql/MySQLNullBitmap.scala | 59 ++++
.../kyuubi/server/mysql/MySQLQueryResult.scala | 140 ++++++++++
.../mysql/authentication/MySQLAuthHandler.scala | 67 +++++
.../server/mysql/constant/MySQLCtxAttrKey.scala | 33 +++
.../scala/org/apache/kyuubi/WithKyuubiServer.scala | 16 +-
.../operation/tpcds/OutputSchemaTPCDSSuite.scala | 4 +-
.../kyuubi/server/mysql/MySQLJDBCTestHelper.scala | 49 ++++
.../kyuubi/server/mysql/MySQLSparkQuerySuite.scala | 297 +++++++++++++++++++++
18 files changed, 1196 insertions(+), 83 deletions(-)
diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md
index 4f342d1..3c4ba32 100644
--- a/docs/deployment/settings.md
+++ b/docs/deployment/settings.md
@@ -214,7 +214,7 @@ kyuubi\.frontend\.mysql<br>\.max\.worker\.threads|<div
style='width: 65pt;word-w
kyuubi\.frontend\.mysql<br>\.min\.worker\.threads|<div style='width:
65pt;word-wrap: break-word;white-space: normal'>9</div>|<div style='width:
170pt;word-wrap: break-word;white-space: normal'>Minimum number of threads in
the command execution thread pool for the MySQL frontend service</div>|<div
style='width: 30pt'>int</div>|<div style='width: 20pt'>1.4.0</div>
kyuubi\.frontend\.mysql<br>\.netty\.worker\.threads|<div style='width:
65pt;word-wrap: break-word;white-space: normal'><undefined></div>|<div
style='width: 170pt;word-wrap: break-word;white-space: normal'>Number of thread
in the netty worker event loop of MySQL frontend service. Use min(cpu_cores, 8)
in default.</div>|<div style='width: 30pt'>int</div>|<div style='width:
20pt'>1.4.0</div>
kyuubi\.frontend\.mysql<br>\.worker\.keepalive\.time|<div style='width:
65pt;word-wrap: break-word;white-space: normal'>PT1M</div>|<div style='width:
170pt;word-wrap: break-word;white-space: normal'>Time(ms) that an idle async
thread of the command execution thread pool will wait for a new task to arrive
before terminating in MySQL frontend service</div>|<div style='width:
30pt'>duration</div>|<div style='width: 20pt'>1.4.0</div>
-kyuubi\.frontend<br>\.protocols|<div style='width: 65pt;word-wrap:
break-word;white-space: normal'>THRIFT_BINARY</div>|<div style='width:
170pt;word-wrap: break-word;white-space: normal'>A comma separated list for all
frontend protocols <ul> <li>THRIFT_BINARY - HiveServer2 compatible thrift
binary protocol.</li> <li>REST - Kyuubi defined REST API(experimental).</li>
</ul></div>|<div style='width: 30pt'>seq</div>|<div style='width:
20pt'>1.4.0</div>
+kyuubi\.frontend<br>\.protocols|<div style='width: 65pt;word-wrap:
break-word;white-space: normal'>THRIFT_BINARY</div>|<div style='width:
170pt;word-wrap: break-word;white-space: normal'>A comma separated list for all
frontend protocols <ul> <li>THRIFT_BINARY - HiveServer2 compatible thrift
binary protocol.</li> <li>REST - Kyuubi defined REST API(experimental).</li>
<li>MYSQL - MySQL compatible text protocol(experimental).</li> </ul></div>|<div
style='width: 30pt'>seq</div>|<div style=' [...]
kyuubi\.frontend\.rest<br>\.bind\.host|<div style='width: 65pt;word-wrap:
break-word;white-space: normal'><undefined></div>|<div style='width:
170pt;word-wrap: break-word;white-space: normal'>Hostname or IP of the machine
on which to run the REST frontend service.</div>|<div style='width:
30pt'>string</div>|<div style='width: 20pt'>1.4.0</div>
kyuubi\.frontend\.rest<br>\.bind\.port|<div style='width: 65pt;word-wrap:
break-word;white-space: normal'>10099</div>|<div style='width: 170pt;word-wrap:
break-word;white-space: normal'>Port of the machine on which to run the REST
frontend service.</div>|<div style='width: 30pt'>int</div>|<div style='width:
20pt'>1.4.0</div>
kyuubi\.frontend<br>\.thrift\.backoff\.slot<br>\.length|<div style='width:
65pt;word-wrap: break-word;white-space: normal'>PT0.1S</div>|<div style='width:
170pt;word-wrap: break-word;white-space: normal'>Time to back off during login
to the thrift frontend service.</div>|<div style='width:
30pt'>duration</div>|<div style='width: 20pt'>1.4.0</div>
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index ebbaf16..408808b 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -261,7 +261,7 @@ object KyuubiConf {
object FrontendProtocols extends Enumeration {
type FrontendProtocol = Value
- val THRIFT_BINARY, REST = Value
+ val THRIFT_BINARY, REST, MYSQL = Value
}
val FRONTEND_PROTOCOLS: ConfigEntry[Seq[String]] =
@@ -270,6 +270,7 @@ object KyuubiConf {
"<ul>" +
" <li>THRIFT_BINARY - HiveServer2 compatible thrift binary
protocol.</li>" +
" <li>REST - Kyuubi defined REST API(experimental).</li> " +
+ " <li>MYSQL - MySQL compatible text protocol(experimental).</li> " +
"</ul>")
.version("1.4.0")
.stringConf
diff --git
a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/HiveJDBCTestHelper.scala
b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/HiveJDBCTestHelper.scala
index 38ff79a..fd0c226 100644
---
a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/HiveJDBCTestHelper.scala
+++
b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/HiveJDBCTestHelper.scala
@@ -17,8 +17,7 @@
package org.apache.kyuubi.operation
-import java.sql.{DriverManager, ResultSet, SQLException, Statement}
-import java.util.Locale
+import java.sql.ResultSet
import scala.collection.JavaConverters._
@@ -29,26 +28,26 @@ import org.apache.thrift.protocol.TBinaryProtocol
import org.apache.thrift.transport.TSocket
import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
-import org.apache.kyuubi.{KyuubiFunSuite, Utils}
+import org.apache.kyuubi.Utils
import org.apache.kyuubi.service.authentication.PlainSASLHelper
-trait HiveJDBCTestHelper extends KyuubiFunSuite {
+trait HiveJDBCTestHelper extends JDBCTestHelper {
- // Load KyuubiHiveDriver class before using it, otherwise will cause the
first call
- // `DriverManager.getConnection("jdbc:hive2://...")` failure.
- // Don't know why, Apache Spark also does the same thing.
- def hiveJdbcDriverClass: String = "org.apache.kyuubi.jdbc.KyuubiHiveDriver"
- Class.forName(hiveJdbcDriverClass)
+ def jdbcDriverClass: String = "org.apache.kyuubi.jdbc.KyuubiHiveDriver"
- protected def defaultSchema = "default"
protected def matchAllPatterns = Seq("", "*", "%", null, ".*", "_*", "_%",
".%")
- protected lazy val user: String = Utils.currentUser
+
+ protected override lazy val user: String = Utils.currentUser
+ protected override val password = "anonymous"
private var _sessionConfigs: Map[String, String] = Map.empty
private var _jdbcConfigs: Map[String, String] = Map.empty
private var _jdbcVars: Map[String, String] = Map.empty
- protected def sessionConfigs: Map[String, String] = _sessionConfigs
- protected def jdbcConfigs: Map[String, String] = _jdbcConfigs
- protected def jdbcVars: Map[String, String] = _jdbcVars
+
+ protected override def sessionConfigs: Map[String, String] = _sessionConfigs
+
+ protected override def jdbcConfigs: Map[String, String] = _jdbcConfigs
+
+ protected override def jdbcVars: Map[String, String] = _jdbcVars
def withSessionConf[T](
sessionConfigs: Map[String, String] = Map.empty)(
@@ -65,11 +64,7 @@ trait HiveJDBCTestHelper extends KyuubiFunSuite {
}
}
- protected def jdbcUrl: String
-
- protected def jdbcUrlWithConf: String = jdbcUrlWithConf(jdbcUrl)
-
- protected def jdbcUrlWithConf(jdbcUrl: String): String = {
+ protected override def jdbcUrlWithConf(jdbcUrl: String): String = {
val sessionConfStr = sessionConfigs.map(kv => kv._1 + "=" +
kv._2).mkString(";")
val jdbcConfStr =
if (jdbcConfigs.isEmpty) {
@@ -86,63 +81,12 @@ trait HiveJDBCTestHelper extends KyuubiFunSuite {
jdbcUrl + sessionConfStr + jdbcConfStr + jdbcVarsStr
}
- def assertJDBCConnectionFail(jdbcUrl: String = jdbcUrlWithConf):
SQLException = {
- intercept[SQLException](DriverManager.getConnection(jdbcUrl, user, ""))
- }
-
- def withMultipleConnectionJdbcStatement(
- tableNames: String*)(fs: (Statement => Unit)*): Unit = {
- val connections = fs.map { _ =>
DriverManager.getConnection(jdbcUrlWithConf, user, "") }
- val statements = connections.map(_.createStatement())
-
- try {
- statements.zip(fs).foreach { case (s, f) => f(s) }
- } finally {
- tableNames.foreach { name =>
- if (name.toUpperCase(Locale.ROOT).startsWith("VIEW")) {
- statements.head.execute(s"DROP VIEW IF EXISTS $name")
- } else {
- statements.head.execute(s"DROP TABLE IF EXISTS $name")
- }
- }
- info("Closing statements")
- statements.foreach(_.close())
- info("Closed statements")
- info("Closing connections")
- connections.foreach(_.close())
- info("Closed connections")
- }
- }
-
- def withDatabases(dbNames: String*)(fs: (Statement => Unit)*): Unit = {
- val connections = fs.map { _ =>
DriverManager.getConnection(jdbcUrlWithConf, user, "") }
- val statements = connections.map(_.createStatement())
-
- try {
- statements.zip(fs).foreach { case (s, f) => f(s) }
- } finally {
- dbNames.reverse.foreach { name =>
- statements.head.execute(s"DROP DATABASE IF EXISTS $name")
- }
- info("Closing statements")
- statements.foreach(_.close())
- info("Closed statements")
- info("Closing connections")
- connections.foreach(_.close())
- info("Closed connections")
- }
- }
-
- def withJdbcStatement(tableNames: String*)(f: Statement => Unit): Unit = {
- withMultipleConnectionJdbcStatement(tableNames: _*)(f)
- }
-
def withThriftClient(f: TCLIService.Iface => Unit): Unit = {
val hostAndPort =
jdbcUrl.stripPrefix("jdbc:hive2://").split("/;").head.split(":")
val host = hostAndPort.head
val port = hostAndPort(1).toInt
val socket = new TSocket(host, port)
- val transport = PlainSASLHelper.getPlainTransport(Utils.currentUser,
"anonymous", socket)
+ val transport = PlainSASLHelper.getPlainTransport(Utils.currentUser,
password, socket)
val protocol = new TBinaryProtocol(transport)
val client = new TCLIService.Client(protocol)
@@ -158,7 +102,7 @@ trait HiveJDBCTestHelper extends KyuubiFunSuite {
withThriftClient { client =>
val req = new TOpenSessionReq()
req.setUsername(user)
- req.setPassword("anonymous")
+ req.setPassword(password)
req.setConfiguration(_sessionConfigs.asJava)
val resp = client.OpenSession(req)
val handle = resp.getSessionHandle
diff --git
a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/JDBCTestHelper.scala
b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/JDBCTestHelper.scala
new file mode 100644
index 0000000..9978a5a
--- /dev/null
+++
b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/JDBCTestHelper.scala
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.operation
+
+import java.sql.{DriverManager, SQLException, Statement}
+import java.util.Locale
+
+import org.apache.kyuubi.KyuubiFunSuite
+
+trait JDBCTestHelper extends KyuubiFunSuite {
+ // Load Driver class before using it, otherwise may cause the first call
+ // `DriverManager.getConnection("jdbc:[protocol]://...")` failure.
+ Class.forName(jdbcDriverClass)
+
+ def jdbcDriverClass: String
+
+ protected def user: String
+
+ protected def password: String
+
+ protected def defaultSchema = "default"
+
+ protected def sessionConfigs: Map[String, String]
+
+ protected def jdbcConfigs: Map[String, String]
+
+ protected def jdbcVars: Map[String, String]
+
+ protected def jdbcUrl: String
+
+ protected def jdbcUrlWithConf: String = jdbcUrlWithConf(jdbcUrl)
+
+ protected def jdbcUrlWithConf(jdbcUrl: String): String
+
+ def assertJDBCConnectionFail(jdbcUrl: String = jdbcUrlWithConf):
SQLException = {
+ intercept[SQLException](DriverManager.getConnection(jdbcUrl, user,
password))
+ }
+
+ def withMultipleConnectionJdbcStatement(
+ tableNames: String*)(fs: (Statement => Unit)*): Unit = {
+ val connections = fs.map { _ =>
DriverManager.getConnection(jdbcUrlWithConf, user, password) }
+ val statements = connections.map(_.createStatement())
+
+ try {
+ statements.zip(fs).foreach { case (s, f) => f(s) }
+ } finally {
+ tableNames.foreach { name =>
+ if (name.toUpperCase(Locale.ROOT).startsWith("VIEW")) {
+ statements.head.execute(s"DROP VIEW IF EXISTS $name")
+ } else {
+ statements.head.execute(s"DROP TABLE IF EXISTS $name")
+ }
+ }
+ info("Closing statements")
+ statements.foreach(_.close())
+ info("Closed statements")
+ info("Closing connections")
+ connections.foreach(_.close())
+ info("Closed connections")
+ }
+ }
+
+ def withDatabases(dbNames: String*)(fs: (Statement => Unit)*): Unit = {
+ val connections = fs.map { _ =>
DriverManager.getConnection(jdbcUrlWithConf, user, password) }
+ val statements = connections.map(_.createStatement())
+
+ try {
+ statements.zip(fs).foreach { case (s, f) => f(s) }
+ } finally {
+ dbNames.reverse.foreach { name =>
+ statements.head.execute(s"DROP DATABASE IF EXISTS $name")
+ }
+ info("Closing statements")
+ statements.foreach(_.close())
+ info("Closed statements")
+ info("Closing connections")
+ connections.foreach(_.close())
+ info("Closed connections")
+ }
+ }
+
+ def withJdbcStatement(tableNames: String*)(f: Statement => Unit): Unit = {
+ withMultipleConnectionJdbcStatement(tableNames: _*)(f)
+ }
+}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiMySQLFrontendService.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiMySQLFrontendService.scala
index 6cce9ef..9fa8bb0 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiMySQLFrontendService.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiMySQLFrontendService.scala
@@ -26,9 +26,11 @@ import io.netty.channel.{ChannelFuture, ChannelInitializer,
ChannelOption}
import io.netty.channel.socket.SocketChannel
import io.netty.handler.logging.{LoggingHandler, LogLevel}
-import org.apache.kyuubi.{KyuubiException, Logging, Utils}
+import org.apache.kyuubi._
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
+import org.apache.kyuubi.server.mysql._
+import org.apache.kyuubi.server.mysql.authentication.MySQLAuthHandler
import org.apache.kyuubi.service.{AbstractFrontendService, Serverable, Service}
import org.apache.kyuubi.util.ExecutorPoolCaptureOom
import org.apache.kyuubi.util.NettyUtils._
@@ -78,7 +80,11 @@ class KyuubiMySQLFrontendService(override val serverable:
Serverable)
.childHandler(new ChannelInitializer[SocketChannel] {
override def initChannel(channel: SocketChannel): Unit =
channel.pipeline
.addLast(new LoggingHandler("org.apache.kyuubi.server.mysql.codec",
LogLevel.TRACE))
- // TODO implement authentication, codec, command handler
+ .addLast(new MySQLFrameDelimiter)
+ .addLast(new MySQLPacketEncoder)
+ .addLast(new MySQLAuthHandler)
+ .addLast(new MySQLPacketDecoder)
+ .addLast(new MySQLCommandHandler(serverable.backendService,
execPool))
})
super.initialize(conf)
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala
index 72e49a0..c194a09 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala
@@ -134,6 +134,9 @@ class KyuubiServer(name: String) extends Serverable(name) {
case REST =>
warn("REST frontend protocol is experimental, API may change in the
future.")
new KyuubiRestFrontendService(this)
+ case MYSQL =>
+ warn("MYSQL frontend protocol is experimental.")
+ new KyuubiMySQLFrontendService(this)
case other =>
throw new UnsupportedOperationException(s"Frontend protocol $other is
not supported yet.")
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/MySQLCodec.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/MySQLCodec.scala
new file mode 100644
index 0000000..013a6bb
--- /dev/null
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/MySQLCodec.scala
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.server.mysql
+
+import java.nio.ByteOrder
+import java.util.{List => JList}
+
+import io.netty.buffer.ByteBuf
+import io.netty.channel.ChannelHandlerContext
+import io.netty.handler.codec._
+
+import org.apache.kyuubi.server.mysql.MySQLCodec._
+import org.apache.kyuubi.server.mysql.MySQLRichByteBuf.Implicit
+import org.apache.kyuubi.server.mysql.constant._
+import org.apache.kyuubi.server.mysql.constant.MySQLCommandPacketType._
+
+object MySQLCodec {
+ val PAYLOAD_LENGTH = 3
+ val SEQUENCE_LENGTH = 1
+}
+
+/**
+ * {{{
+ * lengthFieldOffset = 0
+ * lengthFieldLength = 3
+ * lengthAdjustment = 1
+ * initialBytesToStrip = 3
+ *
+ * BEFORE DECODE (16 bytes) AFTER DECODE (13 bytes)
+ * +----------+------+----------------+ +------+----------------+
+ * | Length | Seq | Actual Content |----->| Seq | Actual Content |
+ * | 0x00000C | 0x01 | "HELLO, WORLD" | | 0x01 | "HELLO, WORLD" |
+ * +----------+------+----------------+ +------+----------------+
+ * }}}
+ *
+ * If the payload is larger than or equal to 2^24-1 bytes the length is
set to 2^24-1
+ * (ff ff ff) and a additional packets are sent with the rest of the payload
until the payload
+ * of a packet is less than 2^24-1 bytes.
+ */
+class MySQLFrameDelimiter extends LengthFieldBasedFrameDecoder(
+ ByteOrder.LITTLE_ENDIAN,
+ 1 << 24,
+ 0,
+ PAYLOAD_LENGTH,
+ SEQUENCE_LENGTH,
+ PAYLOAD_LENGTH,
+ true)
+
+class MySQLPacketDecoder extends ByteToMessageDecoder {
+ override def decode(ctx: ChannelHandlerContext, payload: ByteBuf, out:
JList[AnyRef]): Unit = {
+ require(0 == payload.readInt1, "Sequence ID of MySQL command packet must
be 0")
+ val cmdPacket: MySQLCommandPacket = payload.readInt1 match {
+ case COM_PING.value => MySQLComPingPacket()
+ case COM_QUIT.value => MySQLComQuitPacket()
+ case COM_INIT_DB.value => MySQLComInitDbPacket.decode(payload)
+ case COM_QUERY.value => MySQLComQueryPacket.decode(payload)
+ case COM_STMT_PREPARE.value | COM_STMT_EXECUTE.value
+ | COM_STMT_RESET.value | COM_STMT_CLOSE.value
+ | COM_FIELD_LIST.value =>
+ throw new UnsupportedOperationException(
+ "Currently Kyuubi does not support server side prepared statement")
+ case unsupported =>
+
MySQLUnsupportedCommandPacket(MySQLCommandPacketType.valueOf(unsupported))
+ }
+ out.add(cmdPacket)
+ }
+}
+
+class MySQLPacketEncoder extends MessageToByteEncoder[SupportsEncode] {
+ override def encode(ctx: ChannelHandlerContext, msg: SupportsEncode, out:
ByteBuf): Unit = {
+ try msg.encode(prepareMessageHeader(out).markWriterIndex)
+ catch {
+ case ex: Exception =>
+ out.resetWriterIndex
+ MySQLErrPacket(1, MySQLErrorCode.UNKNOWN_EXCEPTION,
ex.getMessage).encode(out)
+ } finally updateMessageHeader(out, msg.sequenceId)
+ }
+
+ private def prepareMessageHeader(out: ByteBuf): ByteBuf =
+ out.writeReserved(PAYLOAD_LENGTH + SEQUENCE_LENGTH)
+
+ private def updateMessageHeader(out: ByteBuf, sequenceId: Int): Unit = {
+ out.setMediumLE(0, out.readableBytes - PAYLOAD_LENGTH - SEQUENCE_LENGTH)
+ out.setByte(PAYLOAD_LENGTH + SEQUENCE_LENGTH - 1, sequenceId)
+ }
+}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/MySQLCommandHandler.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/MySQLCommandHandler.scala
new file mode 100644
index 0000000..c5e55a7
--- /dev/null
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/MySQLCommandHandler.scala
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.server.mysql
+
+import java.util.concurrent.{ConcurrentHashMap, ThreadPoolExecutor}
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future}
+import scala.util.{Failure, Success}
+
+import io.netty.channel.{ChannelHandlerContext, SimpleChannelInboundHandler}
+import org.apache.hive.service.rpc.thrift.TProtocolVersion
+
+import org.apache.kyuubi.{KyuubiSQLException, Logging}
+import org.apache.kyuubi.operation.FetchOrientation
+import org.apache.kyuubi.operation.OperationState._
+import org.apache.kyuubi.server.mysql.MySQLCommandHandler._
+import org.apache.kyuubi.server.mysql.constant.MySQLCtxAttrKey._
+import org.apache.kyuubi.service.BackendService
+import org.apache.kyuubi.session.SessionHandle
+
+object MySQLCommandHandler {
+ val connIdCounter = new AtomicInteger
+ val connIdToSessHandle = new ConcurrentHashMap[Int, SessionHandle]
+}
+
+class MySQLCommandHandler(be: BackendService, execPool: ThreadPoolExecutor)
+ extends SimpleChannelInboundHandler[MySQLCommandPacket] with Logging {
+
+ private implicit val ec: ExecutionContextExecutor =
ExecutionContext.fromExecutor(execPool)
+
+ @volatile private var closed: Boolean = false
+
+ override def channelInactive(ctx: ChannelHandlerContext): Unit = {
+ closeSession(ctx)
+ super.channelInactive(ctx)
+ }
+
+ // handle process exception, generally should send error packet
+ override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable):
Unit = {
+ val connectionId = ctx.channel.attr(CONNECTION_ID).get
+ val errPacket = MySQLErrPacket(cause)
+ error(s"Connection: $connectionId, $errPacket")
+ if (ctx.channel.isActive) {
+ ctx.writeAndFlush(errPacket)
+ } else {
+ warn(s"Ignore error packet for inactivated connection: $connectionId")
+ }
+ }
+
+ override def channelRead0(ctx: ChannelHandlerContext, packet:
MySQLCommandPacket): Unit = Future {
+ ensureSessionOpened(ctx)
+ packet match {
+ case pkt: MySQLComPingPacket => handlePing(ctx, pkt)
+ case pkt: MySQLComInitDbPacket => handleInitDb(ctx, pkt)
+ case pkt: MySQLComQuitPacket => handleQuit(ctx, pkt)
+ case pkt: MySQLComQueryPacket => handleQuery(ctx, pkt)
+ case bad => throw new
UnsupportedOperationException(bad.getClass.getSimpleName)
+ }
+ } onComplete {
+ case Success(responsePackets) =>
+ responsePackets.foreach(ctx.channel.write)
+ ctx.channel.flush()
+ case Failure(cause) =>
+ exceptionCaught(ctx, cause)
+ }
+
+ def ensureSessionOpened(ctx: ChannelHandlerContext): Unit =
+ if (ctx.channel.attr(SESSION_HANDLE).get == null) synchronized {
+ if (ctx.channel.attr(SESSION_HANDLE).get == null) {
+ val sessionHandle = openSession(ctx)
+ ctx.channel.attr(SESSION_HANDLE).set(sessionHandle)
+ val connectionId = ctx.channel.attr(CONNECTION_ID).get
+ connIdToSessHandle.put(connectionId, sessionHandle)
+ }
+ }
+
+ def openSession(ctx: ChannelHandlerContext): SessionHandle = synchronized {
+ try {
+ val user = ctx.channel.attr(USER).get
+ val remoteIp = ctx.channel.attr(REMOTE_IP).get
+ // TODO parse SET command, save other variables at ChannelHandlerContext
+ val sessionConf = Option(ctx.channel.attr(DATABASE).get) match {
+ case Some(db) => Map("use:database" -> db)
+ case None => Map.empty[String, String]
+ }
+ // v1 is sufficient now, upgrade version when needed
+ val proto = TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1
+ val sessionHandle = be.openSession(proto, user, "", remoteIp,
sessionConf)
+ sessionHandle
+ } catch {
+ case rethrow: Exception =>
+ warn(s"Error opening session: ${rethrow.getMessage}")
+ throw rethrow
+ }
+ }
+
+ def closeSession(ctx: ChannelHandlerContext): Unit = synchronized {
+ if (!closed) {
+ val handle = ctx.channel.attr(SESSION_HANDLE).get
+ info(s"Received request of closing $handle")
+ try be.closeSession(handle)
+ catch {
+ case rethrow: Exception =>
+ warn(s"Error closing session: ${rethrow.getMessage}")
+ throw rethrow
+ } finally {
+ val connectionId = ctx.channel.attr(CONNECTION_ID).getAndSet(null)
+ ctx.channel.attr(SESSION_HANDLE).set(null)
+ connIdToSessHandle.remove(connectionId)
+ }
+ closed = true
+ info(s"Finished closing $handle")
+ }
+ }
+
+ def handlePing(
+ ctx: ChannelHandlerContext,
+ pkg: MySQLComPingPacket
+ ): Seq[MySQLPacket] = {
+ MySQLOKPacket(1) :: Nil
+ }
+
+ def handleInitDb(
+ ctx: ChannelHandlerContext,
+ pkg: MySQLComInitDbPacket
+ ): Seq[MySQLPacket] = {
+ beExecuteStatement(ctx, s"use ${pkg.database}")
+ MySQLOKPacket(1) :: Nil
+ }
+
+ def handleQuit(
+ ctx: ChannelHandlerContext,
+ pkg: MySQLComQuitPacket
+ ): Seq[MySQLPacket] = {
+ closeSession(ctx)
+ MySQLOKPacket(1) :: Nil
+ }
+
+ def handleQuery(
+ ctx: ChannelHandlerContext,
+ pkg: MySQLComQueryPacket
+ ): Seq[MySQLPacket] = {
+ debug(s"Receive query: ${pkg.sql}")
+ executeStatement(ctx, pkg.sql).toPackets
+ }
+
+ def executeStatement(ctx: ChannelHandlerContext, sql: String):
MySQLQueryResult = {
+ val newSQL = MySQLDialectHelper.convertQuery(sql)
+ if (sql != newSQL) debug(s"Converted to $newSQL")
+
+ if (MySQLDialectHelper.shouldExecuteLocal(newSQL)) {
+ MySQLDialectHelper.localExecuteStatement(ctx, newSQL)
+ } else {
+ beExecuteStatement(ctx, newSQL)
+ }
+ }
+
+ private def beExecuteStatement(ctx: ChannelHandlerContext, sql: String):
MySQLQueryResult = {
+ try {
+ val ssHandle = ctx.channel.attr(SESSION_HANDLE).get
+ val opHandle = be.executeStatement(ssHandle, sql, runAsync = false,
queryTimeout = 0)
+ val opStatus = be.getOperationStatus(opHandle)
+ if (opStatus.state != FINISHED) {
+ throw opStatus.exception
+ .getOrElse(KyuubiSQLException(s"Error operator state
${opStatus.state}"))
+ }
+ val tableSchema = be.getResultSetMetadata(opHandle)
+ val rowSet = be.fetchResults(
+ opHandle,
+ FetchOrientation.FETCH_NEXT,
+ Int.MaxValue,
+ fetchLog = false)
+ MySQLQueryResult(tableSchema, rowSet)
+ } catch {
+ case rethrow: Exception =>
+ warn("Error executing statement: ", rethrow)
+ throw rethrow
+ }
+ }
+}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/MySQLDialectHelper.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/MySQLDialectHelper.scala
new file mode 100644
index 0000000..df9cf9b
--- /dev/null
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/MySQLDialectHelper.scala
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.server.mysql
+
+import io.netty.channel.ChannelHandlerContext
+
+import org.apache.kyuubi.server.mysql.constant._
+
+object MySQLDialectHelper {
+
+ def convertQuery(origin: String): String = origin.toLowerCase.trim match {
+ case "select @@version_comment limit 1" =>
+ s"select '${MySQLServerDefines.KYUUBI_SERVER_DESCRIPTION}' as
`@@version_comment`"
+ case "select database()" =>
+ "select current_database() as `database()`"
+ case "select database(), user() limit 1" =>
+ """select
+ | current_database() as `database()`,
+ | session_user() as `user()`
+ |""".stripMargin
+ // scalastyle:off line.size.limit
+ case "select @@character_set_client, @@character_set_connection,
@@character_set_server, @@character_set_database limit 1" =>
+ """select
+ | 'utf8mb4' as `@@character_set_client`,
+ | 'utf8mb4' as `@@character_set_connection`,
+ | 'utf8mb4' as `@@character_set_server`,
+ | 'utf8mb4' as `@@character_set_database`
+ |""".stripMargin
+ // mysql-connector-java:8 initialized query
+ case sql
+ if sql.contains("select @@session.auto_increment_increment as
auto_increment_increment, @@character_set_client as character_set_client,
@@character_set_connection as character_set_connection, @@character_set_results
as character_set_results, @@character_set_server as character_set_server,
@@collation_server as collation_server, @@collation_connection as
collation_connection, @@init_connect as init_connect, @@interactive_timeout as
interactive_timeout, @@license as license, @@low [...]
+ """SELECT
+ | 1 AS auto_increment_increment,
+ | 'utf8mb4' AS character_set_client,
+ | 'utf8mb4' AS character_set_connection,
+ | 'utf8mb4' AS character_set_results,
+ | 'utf8mb4' AS character_set_server,
+ | 'utf8mb4_general_ci' AS collation_server,
+ | 'utf8mb4_general_ci' AS collation_connection,
+ | '' AS init_connect,
+ | 28800 AS interactive_timeout,
+ | 'Apache License 2.0' AS license,
+ | 0 AS lower_case_table_names,
+ | 4194304 AS max_allowed_packet,
+ | 60 AS net_write_timeout,
+ | 0 AS performance_schema,
+ | 1048576 AS query_cache_size,
+ | 'OFF' AS query_cache_type,
+ |
'ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION'
AS sql_mode,
+ | 'UTC' AS system_time_zone,
+ | 'SYSTEM' AS time_zone,
+ | 'REPEATABLE-READ' AS transaction_isolation,
+ | '28800' AS wait_timeout
+ |""".stripMargin
+ // scalastyle:on line.size.limit
+ case "select @@session.transaction_read_only" =>
+ "select '0' as `@@session.transaction_read_only`"
+ case _ => origin
+ }
+
+ def shouldExecuteLocal(sql: String): Boolean = {
+ sql.trim.toLowerCase == "select 'kyuubi'" ||
sql.trim.toLowerCase.startsWith("kill query ")
+ }
+
+ def localExecuteStatement(ctx: ChannelHandlerContext, sql: String):
MySQLQueryResult = {
+ // A mock result, for testing
+ if (sql.trim.toLowerCase == "select 'kyuubi'") {
+ return MySQLQueryResult(
+ MySQLField("kyuubi", MySQLDataType.VAR_STRING) :: Nil,
+ Seq(Seq("KYUUBI")))
+ }
+ throw MySQLErrorCode.ER_NOT_SUPPORTED_YET.toKyuubiSQLException
+ }
+}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/MySQLField.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/MySQLField.scala
new file mode 100644
index 0000000..6ef1cfa
--- /dev/null
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/MySQLField.scala
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.server.mysql
+
+import org.apache.kyuubi.server.mysql.constant.MySQLDataType
+
+case class MySQLField(name: String, dataType: MySQLDataType)
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/MySQLNullBitmap.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/MySQLNullBitmap.scala
new file mode 100644
index 0000000..b1b418f
--- /dev/null
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/MySQLNullBitmap.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.server.mysql
+
+import io.netty.buffer.ByteBuf
+
+import org.apache.kyuubi.server.mysql.MySQLRichByteBuf.Implicit
+
+object MySQLNullBitmap {
+
+ def apply(columnsNumbers: Int, offset: Int): MySQLNullBitmap = {
+ val nullBitmap = new Array[Int](calculateBytes(columnsNumbers, offset))
+ MySQLNullBitmap(offset, nullBitmap)
+ }
+
+ def apply(columnNumbers: Int, payload: ByteBuf): MySQLNullBitmap = {
+ val offset = 0
+ val nullBitmap = new Array[Int](calculateBytes(columnNumbers, 0))
+ fillBitmap(nullBitmap, payload)
+ MySQLNullBitmap(offset, nullBitmap)
+ }
+
+ private def calculateBytes(columnsNumbers: Int, offset: Int): Int =
+ (columnsNumbers + offset + 7) / 8
+
+ private def fillBitmap(nullBitmap: Array[Int], payload: ByteBuf): Unit = {
+ nullBitmap.indices.foreach(i => nullBitmap(i) = payload.readInt1)
+ }
+}
+
+case class MySQLNullBitmap private (
+ offset: Int,
+ nullBitmap: Array[Int] // it's not too efficient but convenient
+) {
+ def isNullParameter(index: Int): Boolean =
+ (nullBitmap(getBytePos(index)) & (1 << getBitPos(index))) != 0
+
+ def setNullBit(index: Int): Unit =
+ nullBitmap(getBytePos(index)) |= 1 << getBitPos(index)
+
+ private def getBytePos(index: Int): Int = (index + offset) / 8
+
+ private def getBitPos(index: Int): Int = (index + offset) % 8
+}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/MySQLQueryResult.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/MySQLQueryResult.scala
new file mode 100644
index 0000000..4e634df
--- /dev/null
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/MySQLQueryResult.scala
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.server.mysql
+
+import scala.collection.JavaConverters._
+
+import org.apache.hive.service.rpc.thrift._
+
+import org.apache.kyuubi.server.mysql.constant.MySQLDataType
+
+object MySQLQueryResult {
+
+ def apply(schema: Seq[MySQLField], rows: Seq[Seq[Any]]):
MySQLSimpleQueryResult = {
+ new MySQLSimpleQueryResult(schema, rows)
+ }
+
+ def apply(schema: TTableSchema, rows: TRowSet): MySQLThriftQueryResult = {
+ new MySQLThriftQueryResult(schema, rows)
+ }
+}
+
+trait MySQLQueryResult {
+
+ def colCount: Int
+
+ def rowCount: Int
+
+ def toColDefinePackets: Seq[MySQLPacket]
+
+ def toRowPackets: Seq[MySQLPacket]
+
+ def toPackets: Seq[MySQLPacket] = {
+ val buf = Seq.newBuilder[MySQLPacket]
+ buf +=
+ MySQLFieldCountPacket(1, colCount) ++=
+ toColDefinePackets +=
+ MySQLEofPacket(colCount + 2) ++=
+ toRowPackets +=
+ MySQLEofPacket(colCount + rowCount + 3)
+ buf.result
+ }
+}
+
+class MySQLSimpleQueryResult(
+ schema: Seq[MySQLField],
+ rows: Seq[Seq[Any]]
+) extends MySQLQueryResult {
+
+ override def colCount: Int = schema.size
+
+ override def rowCount: Int = rows.size
+
+ override def toColDefinePackets: Seq[MySQLPacket] =
+ schema.zipWithIndex.map { case (field, i) =>
+ val sequenceId = 2 + i
+ val decimals = 0 // TODO
+ MySQLColumnDefinition41Packet(
+ sequenceId = sequenceId,
+ flags = 0,
+ name = field.name,
+ columnLength = 100,
+ columnType = field.dataType,
+ decimals = decimals)
+ }
+
+ override def toRowPackets: Seq[MySQLPacket] =
+ rows.zipWithIndex.map { case (row, i) =>
+ val sequenceId = colCount + 3 + i
+ MySQLTextResultSetRowPacket(sequenceId = sequenceId, row = row)
+ }
+}
+
+class MySQLThriftQueryResult(
+ schema: TTableSchema,
+ rows: TRowSet
+) extends MySQLQueryResult {
+
+ override def colCount: Int = schema.getColumnsSize
+
+ override def rowCount: Int = rows.getRows.size
+
+ override def toColDefinePackets: Seq[MySQLPacket] = schema.getColumns.asScala
+ .zipWithIndex.map { case (tCol, i) => tColDescToMySQL(tCol, 2 + i) }
+
+ override def toRowPackets: Seq[MySQLPacket] = rows.getRows.asScala
+ .zipWithIndex.map { case (tRow, i) => tRowToMySQL(tRow, colCount + 3 + i) }
+
+ private def tColDescToMySQL(
+ tCol: TColumnDesc,
+ sequenceId: Int
+ ): MySQLColumnDefinition41Packet = {
+ val tType = tCol.getTypeDesc
+ val dataType = tTypeDescToMySQL(tType)
+ val decimals = 0 // TODO
+ MySQLColumnDefinition41Packet(
+ sequenceId = sequenceId,
+ flags = 0,
+ name = tCol.getColumnName,
+ columnLength = 100,
+ columnType = dataType,
+ decimals = decimals)
+ }
+
+ private def tRowToMySQL(tRow: TRow, sequenceId: Int):
MySQLTextResultSetRowPacket = {
+ val mysqlRow = tRow.getColVals.asScala.map {
+ case tVal: TColumnValue if tVal.isSetBoolVal => tVal.getBoolVal.isValue
+ case tVal: TColumnValue if tVal.isSetByteVal => tVal.getByteVal.getValue
+ case tVal: TColumnValue if tVal.isSetI16Val => tVal.getI16Val.getValue
+ case tVal: TColumnValue if tVal.isSetI32Val => tVal.getI32Val.getValue
+ case tVal: TColumnValue if tVal.isSetI64Val => tVal.getI64Val.getValue
+ case tVal: TColumnValue if tVal.isSetDoubleVal =>
tVal.getDoubleVal.getValue
+ case tVal: TColumnValue if tVal.isSetStringVal =>
tVal.getStringVal.getValue
+ }
+ MySQLTextResultSetRowPacket(sequenceId = sequenceId, row = mysqlRow)
+ }
+
+ private def tTypeDescToMySQL(typeDesc: TTypeDesc): MySQLDataType =
+ typeDesc.getTypes.asScala.toList match {
+ case entry :: Nil if entry.isSetPrimitiveEntry =>
+ MySQLDataType.valueOfThriftType(entry.getPrimitiveEntry.getType)
+ case _ =>
+ // MySQL does not support nest data types
+ MySQLDataType.VAR_STRING
+ }
+}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/authentication/MySQLAuthHandler.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/authentication/MySQLAuthHandler.scala
new file mode 100644
index 0000000..2e70bc5
--- /dev/null
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/authentication/MySQLAuthHandler.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.server.mysql.authentication
+
+import io.netty.buffer.ByteBuf
+import io.netty.channel.{ChannelHandlerContext, SimpleChannelInboundHandler}
+
+import org.apache.kyuubi.Logging
+import org.apache.kyuubi.server.mysql.MySQLErrPacket
+import org.apache.kyuubi.server.mysql.constant.MySQLCapabilityFlag
+import org.apache.kyuubi.server.mysql.constant.MySQLCtxAttrKey._
+
+class MySQLAuthHandler extends SimpleChannelInboundHandler[ByteBuf] with
Logging {
+
+ private val authenticationEngine = new MySQLAuthenticationEngine
+
+ @volatile private var authenticated = false
+
+ override def channelActive(ctx: ChannelHandlerContext): Unit = {
+ val nextConnectionId = authenticationEngine.handshake(ctx)
+ debug(s"Receive new MySQL connection: $nextConnectionId")
+ ctx.channel.attr(CONNECTION_ID).set(nextConnectionId)
+ ctx.channel.attr(CAPABILITY_FLAG).set(MySQLCapabilityFlag.handshakeValue)
+ ctx.fireChannelActive
+ }
+
+ override def channelRead0(ctx: ChannelHandlerContext, payload: ByteBuf):
Unit = {
+ if (authenticated) {
+ ctx.fireChannelRead(payload.retainedSlice)
+ } else {
+ authenticated = authenticate(ctx, payload)
+ }
+ }
+
+ private def authenticate(ctx: ChannelHandlerContext, payload: ByteBuf):
Boolean = {
+ try {
+ val authResult = authenticationEngine.authenticate(ctx, payload)
+ if (authResult.finished) {
+ ctx.channel.attr(USER).set(authResult.user)
+ ctx.channel.attr(REMOTE_IP).set(authResult.ip)
+ ctx.channel.attr(DATABASE).set(authResult.database)
+ return true
+ }
+ } catch {
+ case cause: Exception =>
+ error("Exception occur: ", cause)
+ ctx.writeAndFlush(MySQLErrPacket(cause))
+ ctx.close
+ }
+ false
+ }
+}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/constant/MySQLCtxAttrKey.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/constant/MySQLCtxAttrKey.scala
new file mode 100644
index 0000000..6ddd3b2
--- /dev/null
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/constant/MySQLCtxAttrKey.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.server.mysql.constant
+
+import io.netty.util.AttributeKey
+import io.netty.util.AttributeKey._
+
+import org.apache.kyuubi.session.SessionHandle
+
+object MySQLCtxAttrKey {
+ val CONNECTION_ID: AttributeKey[Integer] = valueOf("CONNECTION_ID")
+ val CAPABILITY_FLAG: AttributeKey[Integer] = valueOf("CAPABILITY_FLAG")
+ val USER: AttributeKey[String] = valueOf[String]("USER")
+ val REMOTE_IP: AttributeKey[String] = valueOf[String]("REMOTE_IP")
+ val DATABASE: AttributeKey[String] = valueOf[String]("DATABASE")
+ val SESSION_HANDLE: AttributeKey[SessionHandle] =
valueOf[SessionHandle]("SESSION_HANDLE")
+ val OP_HANDLE: AttributeKey[SessionHandle] =
valueOf[SessionHandle]("OP_HANDLE")
+}
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServer.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServer.scala
index 6696f29..f8c33b4 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServer.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServer.scala
@@ -28,25 +28,31 @@ trait WithKyuubiServer extends KyuubiFunSuite {
protected val conf: KyuubiConf
+ protected val frontendProtocols: Seq[FrontendProtocols.Value] =
+ FrontendProtocols.THRIFT_BINARY :: Nil
+
private var zkServer: EmbeddedZookeeper = _
- private var server: KyuubiServer = _
+ protected var server: KyuubiServer = _
override def beforeAll(): Unit = {
+ conf.set(FRONTEND_PROTOCOLS, frontendProtocols.map(_.toString))
+ conf.set(FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
+ conf.set(FRONTEND_REST_BIND_PORT, 0)
+ conf.set(FRONTEND_MYSQL_BIND_PORT, 0)
+
zkServer = new EmbeddedZookeeper()
conf.set(ZookeeperConf.ZK_CLIENT_PORT, 0)
val zkData = Utils.createTempDir()
conf.set(ZookeeperConf.ZK_DATA_DIR, zkData.toString)
zkServer.initialize(conf)
zkServer.start()
+ conf.set(HA_ZK_QUORUM, zkServer.getConnectString)
+ conf.set(HA_ZK_AUTH_TYPE, ZooKeeperAuthTypes.NONE.toString)
conf.set("spark.ui.enabled", "false")
conf.setIfMissing("spark.sql.catalogImplementation", "in-memory")
- conf.set(FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
conf.setIfMissing(ENGINE_CHECK_INTERVAL, 3000L)
conf.setIfMissing(ENGINE_IDLE_TIMEOUT, 10000L)
- conf.set(HA_ZK_QUORUM, zkServer.getConnectString)
- conf.set(HA_ZK_AUTH_TYPE, ZooKeeperAuthTypes.NONE.toString)
-
// TODO KYUUBI #745
conf.setIfMissing(ENGINE_INIT_TIMEOUT, 300000L)
server = KyuubiServer.startServer(conf)
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/tpcds/OutputSchemaTPCDSSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/tpcds/OutputSchemaTPCDSSuite.scala
index 7a05981..6d69f83 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/tpcds/OutputSchemaTPCDSSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/tpcds/OutputSchemaTPCDSSuite.scala
@@ -22,7 +22,7 @@ import java.nio.file.{Files, Path, Paths}
import org.apache.kyuubi.{DeltaSuiteMixin, WithKyuubiServer}
import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.operation.HiveJDBCTestHelper
+import org.apache.kyuubi.server.mysql.MySQLJDBCTestHelper
import org.apache.kyuubi.tags.{DeltaTest, ExtendedSQLTest}
// scalastyle:off line.size.limit
@@ -41,7 +41,7 @@ import org.apache.kyuubi.tags.{DeltaTest, ExtendedSQLTest}
@DeltaTest
@ExtendedSQLTest
class OutputSchemaTPCDSSuite extends WithKyuubiServer
- with HiveJDBCTestHelper
+ with MySQLJDBCTestHelper
with TPCDSHelper
with DeltaSuiteMixin {
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/mysql/MySQLJDBCTestHelper.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/mysql/MySQLJDBCTestHelper.scala
new file mode 100644
index 0000000..727fee6
--- /dev/null
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/mysql/MySQLJDBCTestHelper.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.server.mysql
+
+import org.apache.kyuubi.Utils
+import org.apache.kyuubi.operation.JDBCTestHelper
+
+trait MySQLJDBCTestHelper extends JDBCTestHelper {
+
+ override def jdbcDriverClass: String = "com.mysql.jdbc.Driver"
+
+ protected lazy val user: String = Utils.currentUser
+
+ protected val password: String = "kyuubi"
+
+ private val _jdbcConfigs: Map[String, String] = Map(
+ "useSSL" -> "false")
+
+ protected override def sessionConfigs: Map[String, String] = Map.empty
+
+ protected override def jdbcConfigs: Map[String, String] = _jdbcConfigs
+
+ protected override def jdbcVars: Map[String, String] = Map.empty
+
+ protected def jdbcUrlWithConf(jdbcUrl: String): String = {
+ val jdbcConfStr =
+ if (jdbcConfigs.isEmpty) {
+ ""
+ } else {
+ "?" + jdbcConfigs.map(kv => kv._1 + "=" + kv._2).mkString(";")
+ }
+ jdbcUrl + jdbcConfStr
+ }
+}
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/mysql/MySQLSparkQuerySuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/mysql/MySQLSparkQuerySuite.scala
new file mode 100644
index 0000000..272619c
--- /dev/null
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/mysql/MySQLSparkQuerySuite.scala
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.server.mysql
+
+import java.sql._
+
+import org.apache.commons.lang3.StringUtils
+
+import org.apache.kyuubi.{KYUUBI_VERSION, WithKyuubiServer}
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.KyuubiConf.FrontendProtocols
+
+class MySQLSparkQuerySuite extends WithKyuubiServer with MySQLJDBCTestHelper {
+
+ override protected val conf: KyuubiConf = KyuubiConf()
+
+ override protected val frontendProtocols:
Seq[KyuubiConf.FrontendProtocols.Value] =
+ FrontendProtocols.MYSQL :: Nil
+
+ override protected def getJdbcUrl: String =
+ s"jdbc:mysql://${server.frontendServices.head.connectionUrl}/"
+
+ override protected def jdbcUrl: String = getJdbcUrl
+
+ test("execute statement - select null") {
+ withJdbcStatement() { statement =>
+ val resultSet = statement.executeQuery("SELECT NULL AS col")
+ assert(resultSet.next())
+ assert(resultSet.getString("col") === null)
+ val metaData = resultSet.getMetaData
+ assert(metaData.getColumnType(1) === java.sql.Types.NULL)
+ assert(metaData.getPrecision(1) === 25)
+ assert(metaData.getScale(1) === 0)
+ }
+ }
+
+ test("execute statement - select boolean") {
+ withJdbcStatement() { statement =>
+ val resultSet = statement.executeQuery("SELECT false AS col")
+ assert(resultSet.next())
+ assert(!resultSet.getBoolean("col"))
+ val metaData = resultSet.getMetaData
+ assert(metaData.getColumnType(1) === java.sql.Types.TINYINT)
+ assert(metaData.getPrecision(1) === 3)
+ assert(metaData.getScale(1) === 0)
+ }
+ }
+
+ test("execute statement - select tinyint") {
+ withJdbcStatement() { statement =>
+ val resultSet = statement.executeQuery("SELECT 1Y AS col")
+ assert(resultSet.next())
+ assert(resultSet.getByte("col") === 1.toByte)
+ val metaData = resultSet.getMetaData
+ assert(metaData.getColumnType(1) === java.sql.Types.TINYINT)
+ assert(metaData.getPrecision(1) === 3)
+ assert(metaData.getScale(1) === 0)
+ }
+ }
+
+ test("execute statement - select smallint") {
+ withJdbcStatement() { statement =>
+ val resultSet = statement.executeQuery("SELECT 1S AS col")
+ assert(resultSet.next())
+ assert(resultSet.getShort("col") === 1.toShort)
+ val metaData = resultSet.getMetaData
+ assert(metaData.getColumnType(1) === java.sql.Types.SMALLINT)
+ assert(metaData.getPrecision(1) === 5)
+ assert(metaData.getScale(1) === 0)
+ }
+ }
+
+ test("execute statement - select int") {
+ withJdbcStatement() { statement =>
+ val resultSet = statement.executeQuery("SELECT 4 AS col")
+ assert(resultSet.next())
+ assert(resultSet.getInt("col") === 4)
+ val metaData = resultSet.getMetaData
+ assert(metaData.getColumnType(1) === java.sql.Types.INTEGER)
+ assert(metaData.getPrecision(1) === 10)
+ assert(metaData.getScale(1) === 0)
+ }
+ }
+
+ test("execute statement - select long") {
+ withJdbcStatement() { statement =>
+ val resultSet = statement.executeQuery("SELECT 4L AS col")
+ assert(resultSet.next())
+ assert(resultSet.getLong("col") === 4L)
+ val metaData = resultSet.getMetaData
+ assert(metaData.getColumnType(1) === java.sql.Types.BIGINT)
+ assert(metaData.getPrecision(1) === 19)
+ assert(metaData.getScale(1) === 0)
+ }
+ }
+
+ test("execute statement - select float") {
+ withJdbcStatement() { statement =>
+ val resultSet = statement.executeQuery("SELECT cast(1.2 as float) AS
col")
+ assert(resultSet.next())
+ assert(resultSet.getFloat("col") === 1.2f)
+ val metaData = resultSet.getMetaData
+ assert(metaData.getColumnType(1) === java.sql.Types.REAL)
+ assert(metaData.getPrecision(1) === 100)
+ assert(metaData.getScale(1) === 0)
+ }
+ }
+
+ test("execute statement - select double") {
+ withJdbcStatement() { statement =>
+ val resultSet = statement.executeQuery("SELECT 4.2D AS col")
+ assert(resultSet.next())
+ assert(resultSet.getDouble("col") === 4.2d)
+ val metaData = resultSet.getMetaData
+ assert(metaData.getColumnType(1) === java.sql.Types.DOUBLE)
+ assert(metaData.getPrecision(1) === 100)
+ assert(metaData.getScale(1) === 0)
+ }
+ }
+
+ test("execute statement - select string") {
+ withJdbcStatement() { statement =>
+ val resultSet = statement.executeQuery("SELECT 'kentyao' AS col")
+ assert(resultSet.next())
+ assert(resultSet.getString("col") === "kentyao")
+ val metaData = resultSet.getMetaData
+ assert(metaData.getColumnType(1) === java.sql.Types.VARCHAR)
+ assert(metaData.getPrecision(1) === 25)
+ assert(metaData.getScale(1) === 0)
+ }
+ }
+
+ test("execute statement - select binary") {
+ withJdbcStatement() { statement =>
+ val resultSet = statement.executeQuery("SELECT cast('kyuubi' as binary)
AS col")
+ assert(resultSet.next())
+ assert(resultSet.getObject("col") === "kyuubi")
+ val metaData = resultSet.getMetaData
+ assert(metaData.getColumnType(1) === java.sql.Types.CHAR)
+ assert(metaData.getPrecision(1) === 25)
+ assert(metaData.getScale(1) === 0)
+ }
+ }
+
+ test("execute statement - select date") {
+ withJdbcStatement() { statement =>
+ val resultSet = statement.executeQuery("SELECT DATE '2018-11-17' AS col")
+ assert(resultSet.next())
+ assert(resultSet.getDate("col") === Date.valueOf("2018-11-17"))
+ val metaData = resultSet.getMetaData
+ assert(metaData.getColumnType(1) === java.sql.Types.DATE)
+ assert(metaData.getPrecision(1) === 25)
+ assert(metaData.getScale(1) === 0)
+ }
+ }
+
+ test("execute statement - select timestamp") {
+ withJdbcStatement() { statement =>
+ val resultSet = statement.executeQuery("SELECT TIMESTAMP '2018-11-17
13:33:33' AS col")
+ assert(resultSet.next())
+ assert(resultSet.getTimestamp("col") === Timestamp.valueOf("2018-11-17
13:33:33"))
+ val metaData = resultSet.getMetaData
+ assert(metaData.getColumnType(1) === java.sql.Types.TIMESTAMP)
+ assert(metaData.getPrecision(1) === 25)
+ assert(metaData.getScale(1) === 0)
+ }
+ }
+
+ ignore("execute statement - select interval") {
+ withJdbcStatement() { statement =>
+ val resultSet = statement.executeQuery("SELECT interval '1' day AS col")
+ assert(resultSet.next())
+ assert(resultSet.getString("col") === "1 days")
+ assert(resultSet.getMetaData.getColumnType(1) === java.sql.Types.VARCHAR)
+ val metaData = resultSet.getMetaData
+ assert(metaData.getPrecision(1) === Int.MaxValue)
+ assert(metaData.getScale(1) === 0)
+ }
+ }
+
+ test("execute statement - select array") {
+ withJdbcStatement() { statement =>
+ val resultSet = statement.executeQuery(
+ "SELECT array() AS col1, array(1) AS col2, array(null) AS col3")
+ assert(resultSet.next())
+ assert(resultSet.getObject("col1") === "[]")
+ assert(resultSet.getObject("col2") === "[1]")
+ assert(resultSet.getObject("col3") === "[null]")
+ val metaData = resultSet.getMetaData
+ assert(metaData.getColumnType(1) === java.sql.Types.VARCHAR)
+ assert(metaData.getPrecision(1) === 25)
+ assert(metaData.getPrecision(2) == 25)
+ assert(metaData.getScale(1) == 0)
+ assert(metaData.getScale(2) == 0)
+ }
+ }
+
+ test("execute statement - select map") {
+ withJdbcStatement() { statement =>
+ val resultSet = statement.executeQuery(
+ "SELECT map() AS col1, map(1, 2, 3, 4) AS col2, map(1, null) AS col3")
+ assert(resultSet.next())
+ assert(resultSet.getObject("col1") === "{}")
+ assert(resultSet.getObject("col2") === "{1:2,3:4}")
+ assert(resultSet.getObject("col3") === "{1:null}")
+ val metaData = resultSet.getMetaData
+ assert(metaData.getColumnType(1) === java.sql.Types.VARCHAR)
+ assert(metaData.getPrecision(1) === 25)
+ assert(metaData.getPrecision(2) == 25)
+ assert(metaData.getScale(1) == 0)
+ assert(metaData.getScale(2) == 0)
+ }
+ }
+
+ test("execute statement - select struct") {
+ withJdbcStatement() { statement =>
+ val resultSet = statement.executeQuery(
+ "SELECT struct('1', '2') AS col1," +
+ " named_struct('a', 2, 'b', 4) AS col2," +
+ " named_struct('a', null, 'b', null) AS col3")
+ assert(resultSet.next())
+ assert(resultSet.getObject("col1") === """{"col1":"1","col2":"2"}""")
+ assert(resultSet.getObject("col2") === """{"a":2,"b":4}""")
+ assert(resultSet.getObject("col3") === """{"a":null,"b":null}""")
+
+ val metaData = resultSet.getMetaData
+ assert(metaData.getColumnType(1) === java.sql.Types.VARCHAR)
+ assert(metaData.getPrecision(1) === 25)
+ assert(metaData.getPrecision(2) == 25)
+ assert(metaData.getScale(1) == 0)
+ assert(metaData.getScale(2) == 0)
+ }
+ }
+
+ test("execute statement - analysis exception") {
+ val sql = "select date_sub(date'2011-11-11', '1.2')"
+
+ withJdbcStatement() { statement =>
+ val e = intercept[SQLException] {
+ statement.executeQuery(sql)
+ }
+ assert(e.getMessage
+ .contains("The second argument of 'date_sub' function needs to be an
integer."))
+ }
+ }
+
+ test("execute statement - select with builtin functions") {
+ withJdbcStatement() { statement =>
+ val resultSet = statement.executeQuery("SELECT substring('kentyao', 1)")
+ assert(resultSet.next())
+ assert(resultSet.getString("substring(kentyao, 1, 2147483647)") ===
"kentyao")
+ val metaData = resultSet.getMetaData
+ assert(metaData.getColumnType(1) === java.sql.Types.VARCHAR)
+ assert(metaData.getPrecision(1) === 25)
+ assert(metaData.getScale(1) === 0)
+ }
+ }
+
+ test("kyuubi defined function - kyuubi_version") {
+ withJdbcStatement() { statement =>
+ val rs = statement.executeQuery("SELECT kyuubi_version()")
+ assert(rs.next())
+ assert(rs.getString(1) == KYUUBI_VERSION)
+ }
+ }
+
+ test("kyuubi defined function - engine_name") {
+ withJdbcStatement() { statement =>
+ val rs = statement.executeQuery("SELECT engine_name()")
+ assert(rs.next())
+ assert(StringUtils.isNotBlank(rs.getString(1)))
+ }
+ }
+
+ test("kyuubi defined function - engine_id") {
+ withJdbcStatement() { statement =>
+ val rs = statement.executeQuery("SELECT engine_id()")
+ assert(rs.next())
+ assert(StringUtils.isNotBlank(rs.getString(1)))
+ }
+ }
+}