This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 56a88f1 [KYUUBI #1395] Implement MySQL protocol codec
56a88f1 is described below
commit 56a88f1e48ccf705793f161f059c2f05a2484aa3
Author: Cheng Pan <[email protected]>
AuthorDate: Wed Nov 17 18:42:01 2021 +0800
[KYUUBI #1395] Implement MySQL protocol codec
### _Why are the changes needed?_
Sub task of #1219
The required `MySQLPacket`s are split into 4 groups, as Kyuubi is designed
to be a "MySQL Server", only part of packets need to be encodable and others
just need to be decodable.
- Generic
- `MySQLOKPacket` with `SupportsEncode`
- `MySQLErrPacket` with `SupportsEncode`
- `MySQLEofPacket` with `SupportsEncode`
- Authentication
- `MySQLHandshakePacket` with `SupportsEncode`
- `MySQLHandshakeResponse41Packet` with `SupportsDecode`
- `MySQLAuthSwitchRequestPacket` with `SupportsEncode`
- `MySQLAuthSwitchResponsePacket` with `SupportsDecode`
- Command
- `MySQLComPingPacket` with `SupportsDecode`
- `MySQLComInitDbPacket` with `SupportsDecode`
- `MySQLComFieldListPacket` with `SupportsDecode`
- `MySQLComQueryPacket` with `SupportsDecode`
- `MySQLComQuitPacket` with `SupportsDecode`
- Data
- `MySQLFieldCountPacket` with`SupportsEncode`
- `MySQLColumnDefinition41Packet` with `SupportsEncode`
- `MySQLTextResultSetRowPacket` with `SupportsEncode`
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run
test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #1395 from pan3793/mysql-protocol.
Closes #1395
25b7b3d8 [Cheng Pan] Nit
90637ea1 [Cheng Pan] Fix compile
7fae786e [Cheng Pan] Use while
47a1da82 [Cheng Pan] Add MySQL links
b6d91f7b [Cheng Pan] Implement MySQL protocol codec
Authored-by: Cheng Pan <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
.../kyuubi/server/mysql/MySQLCommandPackets.scala | 73 +++++
.../kyuubi/server/mysql/MySQLDataPackets.scala | 108 +++++++
.../kyuubi/server/mysql/MySQLDateTimeUtils.scala | 24 ++
.../kyuubi/server/mysql/MySQLGenericPackets.scala | 110 +++++++
.../apache/kyuubi/server/mysql/MySQLPacket.scala | 39 +++
.../kyuubi/server/mysql/MySQLRichByteBuf.scala | 328 +++++++++++++++++++++
.../mysql/authentication/MySQLAuthPackets.scala | 150 ++++++++++
.../mysql/authentication/MySQLAuthentication.scala | 182 ++++++++++++
.../mysql/authentication/MySQLNativePassword.scala | 72 +++++
.../mysql/constant/MySQLCapabilityFlag.scala | 96 ++++++
.../mysql/constant/MySQLCommandPacketType.scala | 154 ++++++++++
.../server/mysql/constant/MySQLDataType.scala | 182 ++++++++++++
.../server/mysql/constant/MySQLErrorCode.scala | 106 +++++++
.../mysql/constant/MySQLFieldDetailFlag.scala | 54 ++++
.../server/mysql/constant/MySQLServerDefines.scala | 28 ++
.../server/mysql/constant/MySQLStatusFlag.scala | 52 ++++
.../kyuubi/server/mysql/MySQLCodecHelper.scala | 46 +++
.../server/mysql/MySQLCommandPacketSuite.scala | 50 ++++
.../kyuubi/server/mysql/MySQLDataPacketSuite.scala | 45 +++
.../server/mysql/MySQLGenericPacketSuite.scala | 45 +++
.../authentication/MySQLAuthPacketSuite.scala | 111 +++++++
21 files changed, 2055 insertions(+)
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/MySQLCommandPackets.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/MySQLCommandPackets.scala
new file mode 100644
index 0000000..9160f0b
--- /dev/null
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/MySQLCommandPackets.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.server.mysql
+
+import io.netty.buffer.ByteBuf
+
+import org.apache.kyuubi.server.mysql.MySQLRichByteBuf.Implicit
+import org.apache.kyuubi.server.mysql.constant.MySQLCommandPacketType
+
+sealed abstract class MySQLCommandPacket(
+ cmdType: MySQLCommandPacketType
+) extends MySQLPacket {
+ override def sequenceId: Int = 0
+}
+
+case class MySQLComPingPacket()
+ extends MySQLCommandPacket(MySQLCommandPacketType.COM_PING)
+
+case class MySQLComQuitPacket()
+ extends MySQLCommandPacket(MySQLCommandPacketType.COM_QUIT)
+
+object MySQLComInitDbPacket extends SupportsDecode[MySQLComInitDbPacket] {
+ override def decode(payload: ByteBuf): MySQLComInitDbPacket = {
+ val schema = payload.readStringEOF
+ MySQLComInitDbPacket(schema)
+ }
+}
+case class MySQLComInitDbPacket(
+ database: String
+) extends MySQLCommandPacket(MySQLCommandPacketType.COM_INIT_DB)
+
+object MySQLComFieldListPacket extends SupportsDecode[MySQLComFieldListPacket]
{
+ override def decode(payload: ByteBuf): MySQLComFieldListPacket = {
+ val table = payload.readStringNul
+ val fieldWildcard = payload.readStringEOF
+ MySQLComFieldListPacket(table, fieldWildcard)
+ }
+}
+
+case class MySQLComFieldListPacket(
+ table: String,
+ fieldWildcard: String
+) extends MySQLCommandPacket(MySQLCommandPacketType.COM_FIELD_LIST)
+
+object MySQLComQueryPacket extends SupportsDecode[MySQLComQueryPacket] {
+ override def decode(payload: ByteBuf): MySQLComQueryPacket = {
+ val sql = payload.readStringEOF
+ MySQLComQueryPacket(sql)
+ }
+}
+
+case class MySQLComQueryPacket(
+ sql: String
+) extends MySQLCommandPacket(MySQLCommandPacketType.COM_QUERY)
+
+case class MySQLUnsupportedCommandPacket(
+ cmdType: MySQLCommandPacketType
+) extends MySQLCommandPacket(cmdType)
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/MySQLDataPackets.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/MySQLDataPackets.scala
new file mode 100644
index 0000000..624b691
--- /dev/null
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/MySQLDataPackets.scala
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.server.mysql
+
+import java.lang.{Boolean => JBoolean}
+import java.math.BigDecimal
+import java.sql.Timestamp
+import java.time.LocalDateTime
+
+import io.netty.buffer.ByteBuf
+
+import org.apache.kyuubi.server.mysql.MySQLDateTimeUtils._
+import org.apache.kyuubi.server.mysql.MySQLRichByteBuf.Implicit
+import org.apache.kyuubi.server.mysql.constant.{MySQLDataType,
MySQLServerDefines}
+
+case class MySQLFieldCountPacket(
+ sequenceId: Int,
+ columnCount: Int
+) extends MySQLPacket with SupportsEncode {
+
+ override def encode(payload: ByteBuf): Unit = {
+ payload.writeIntLenenc(columnCount)
+ }
+}
+
+case class MySQLColumnDefinition41Packet(
+ sequenceId: Int,
+ flags: Int,
+ name: String,
+ columnLength: Int,
+ columnType: MySQLDataType,
+ decimals: Int
+) extends MySQLPacket with SupportsEncode {
+
+ def nextLength: Int = 0x0c
+
+ def characterSet: Int = MySQLServerDefines.CHARSET
+
+ def catalog: String = ""
+
+ def database: String = ""
+
+ def table: String = ""
+
+ def originalTable: String = ""
+
+ def originalName: String = ""
+
+ def containDefaultValues: Boolean = false
+
+ override def encode(payload: ByteBuf): Unit = {
+ payload.writeStringLenenc(catalog)
+ payload.writeStringLenenc(database)
+ payload.writeStringLenenc(table)
+ payload.writeStringLenenc(originalTable)
+ payload.writeStringLenenc(name)
+ payload.writeStringLenenc(originalName)
+ payload.writeIntLenenc(nextLength)
+ payload.writeInt2(characterSet)
+ payload.writeInt4(columnLength)
+ payload.writeInt1(columnType.value)
+ payload.writeInt2(flags)
+ payload.writeInt1(decimals)
+ payload.writeReserved(2)
+ if (containDefaultValues) {
+ payload.writeIntLenenc(0)
+ payload.writeStringLenenc("")
+ }
+ }
+}
+
+case class MySQLTextResultSetRowPacket(
+ sequenceId: Int,
+ row: Seq[Any]
+) extends MySQLPacket with SupportsEncode {
+
+ private def nullVal = 0xfb
+
+ override def encode(payload: ByteBuf): Unit = {
+ row.foreach {
+ case null => payload.writeInt1(nullVal)
+ // TODO check all possible data types returned from backend service
+ case bytes: Array[Byte] => payload.writeBytesLenenc(bytes)
+ case ts: Timestamp if ts.getNanos == 0 =>
+ payload.writeStringLenenc(ts.toString.split("\\.")(0))
+ case decimal: BigDecimal =>
payload.writeStringLenenc(decimal.toPlainString)
+ case JBoolean.TRUE | true => payload.writeBytesLenenc(Array[Byte]('1'))
+ case JBoolean.FALSE | false => payload.writeBytesLenenc(Array[Byte]('0'))
+ case time: LocalDateTime => payload.writeStringLenenc(dtFmt.format(time))
+ case other => payload.writeStringLenenc(other.toString)
+ }
+ }
+}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/MySQLDateTimeUtils.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/MySQLDateTimeUtils.scala
new file mode 100644
index 0000000..89409bc
--- /dev/null
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/MySQLDateTimeUtils.scala
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.server.mysql
+
+import java.time.format.DateTimeFormatter
+
+object MySQLDateTimeUtils {
+ val dtFmt: DateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd
HH:mm:ss")
+}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/MySQLGenericPackets.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/MySQLGenericPackets.scala
new file mode 100644
index 0000000..4e9e40e
--- /dev/null
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/MySQLGenericPackets.scala
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.server.mysql
+
+import java.sql.SQLException
+
+import io.netty.buffer.ByteBuf
+
+import org.apache.kyuubi.KyuubiSQLException
+import org.apache.kyuubi.server.mysql.MySQLRichByteBuf.Implicit
+import org.apache.kyuubi.server.mysql.constant._
+
+case class MySQLOKPacket(
+ sequenceId: Int = 0,
+ affectedRows: Long = 0L,
+ lastInsertId: Long = 0L
+) extends MySQLPacket with SupportsEncode {
+
+ def header: Int = 0x00
+
+ def statusFlag: MySQLStatusFlag = MySQLStatusFlag.SERVER_STATUS_AUTOCOMMIT
+
+ def warnings: Int = 0
+
+ def info: String = ""
+
+ override def encode(payload: ByteBuf): Unit = {
+ payload.writeInt1(header)
+ payload.writeIntLenenc(affectedRows)
+ payload.writeIntLenenc(lastInsertId)
+ payload.writeInt2(statusFlag.value)
+ payload.writeInt2(warnings)
+ payload.writeStringEOF(info)
+ }
+}
+
+object MySQLErrPacket {
+ def apply(cause: Throwable): MySQLErrPacket = {
+ cause match {
+ case kse: KyuubiSQLException if kse.getCause != null =>
+ // prefer brief nested error message instead of whole stacktrace
+ apply(kse.getCause)
+ case e: Exception if e.getMessage contains "NoSuchDatabaseException" =>
+ MySQLErrPacket(1, MySQLErrorCode.ER_BAD_DB_ERROR, cause.getMessage)
+ case se: SQLException if se.getSQLState == null =>
+ MySQLErrPacket(1, MySQLErrorCode.ER_INTERNAL_ERROR, cause.getMessage)
+ case se: SQLException =>
+ MySQLErrPacket(1, MySQLErrorCode(se.getErrorCode, se.getSQLState,
se.getMessage))
+ case _ =>
+ MySQLErrPacket(1, MySQLErrorCode.UNKNOWN_EXCEPTION, cause.getMessage)
+ }
+ }
+}
+
+case class MySQLErrPacket(
+ sequenceId: Int,
+ sqlErrorCode: MySQLErrorCode,
+ errMsgArgs: String*
+) extends MySQLPacket with SupportsEncode {
+
+ def header: Int = 0xff
+
+ def sqlStateMarker: String = "#"
+
+ def errorCode: Int = sqlErrorCode.errorCode
+
+ def sqlState: String = sqlErrorCode.sqlState
+
+ def errorMessage: String = sqlErrorCode.errorMessage format (errMsgArgs: _*)
+
+ override def encode(payload: ByteBuf): Unit = {
+ payload.writeInt1(header)
+ payload.writeInt2(errorCode)
+ payload.writeStringFix(sqlStateMarker)
+ payload.writeStringFix(sqlState)
+ payload.writeStringEOF(errorMessage)
+ }
+}
+
+case class MySQLEofPacket(
+ sequenceId: Int = 0
+) extends MySQLPacket with SupportsEncode {
+
+ def header: Int = 0xfe
+
+ def warnings: Int = 0
+
+ def statusFlags: MySQLStatusFlag = MySQLStatusFlag.SERVER_STATUS_AUTOCOMMIT
+
+ override def encode(payload: ByteBuf): Unit = {
+ payload.writeInt1(header)
+ payload.writeInt2(warnings)
+ payload.writeInt2(statusFlags.value)
+ }
+}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/MySQLPacket.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/MySQLPacket.scala
new file mode 100644
index 0000000..461a429
--- /dev/null
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/MySQLPacket.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.server.mysql
+
+import io.netty.buffer.ByteBuf
+
+/**
+ * The required `MySQLPacket`s are split into 4 groups, which are GENERIC,
AUTHENTICATION,
+ * COMMAND and DATA.
+ * <p>
+ * As Kyuubi is designed to be a "MySQL Server", only part of packets need to
be encodable
+ * and others just need to be decodable.
+ */
+trait MySQLPacket {
+ def sequenceId: Int
+}
+
+trait SupportsEncode extends MySQLPacket {
+ def encode(payload: ByteBuf): Unit
+}
+
+trait SupportsDecode[T <: MySQLPacket] {
+ def decode(payload: ByteBuf): T
+}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/MySQLRichByteBuf.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/MySQLRichByteBuf.scala
new file mode 100644
index 0000000..70a59bb
--- /dev/null
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/MySQLRichByteBuf.scala
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.server.mysql
+
+import java.nio.charset.StandardCharsets
+
+import io.netty.buffer.ByteBuf
+
+// https://dev.mysql.com/doc/internals/en/integer.html#packet-Protocol
+object MySQLRichByteBuf {
+
+ private def charset = StandardCharsets.UTF_8
+
+ implicit class Implicit(self: ByteBuf) {
+
+ /**
+ * Read 1 byte fixed length integer from byte buffers.
+ *
+ * @return 1 byte fixed length integer
+ */
+ def readInt1: Int = self.readUnsignedByte
+
+ /**
+ * Write 1 byte fixed length integer to byte buffers.
+ *
+ * @param value 1 byte fixed length integer
+ */
+ def writeInt1(value: Int): ByteBuf = self.writeByte(value)
+
+ /**
+ * Read 2 byte fixed length integer from byte buffers.
+ *
+ * @return 2 byte fixed length integer
+ */
+ def readInt2: Int = self.readUnsignedShortLE
+
+ /**
+ * Write 2 byte fixed length integer to byte buffers.
+ *
+ * @param value 2 byte fixed length integer
+ */
+ def writeInt2(value: Int): ByteBuf = self.writeShortLE(value)
+
+ /**
+ * Read 3 byte fixed length integer from byte buffers.
+ *
+ * @return 3 byte fixed length integer
+ */
+ def readInt3: Int = self.readUnsignedMediumLE
+
+ /**
+ * Write 3 byte fixed length integer to byte buffers.
+ *
+ * @param value 3 byte fixed length integer
+ */
+ def writeInt3(value: Int): ByteBuf = self.writeMediumLE(value)
+
+ /**
+ * Read 4 byte fixed length integer from byte buffers.
+ *
+ * @return 4 byte fixed length integer
+ */
+ def readInt4: Int = self.readIntLE
+
+ /**
+ * Write 4 byte fixed length integer to byte buffers.
+ *
+ * @param value 4 byte fixed length integer
+ */
+ def writeInt4(value: Int): ByteBuf = self.writeIntLE(value)
+
+ /**
+ * Read 6 byte fixed length integer from byte buffers.
+ *
+ * @return 6 byte fixed length integer
+ */
+ def readInt6: Long = {
+ var result = 0L
+ var i = 0
+ while (i < 6) {
+ result |= (0xff & self.readByte).toLong << (8 * i)
+ i = i + 1
+ }
+ result
+ }
+
+ /**
+ * Write 6 byte fixed length integer to byte buffers.
+ *
+ * @param value 6 byte fixed length integer
+ */
+ def writeInt6(value: Long): ByteBuf = throw new
UnsupportedOperationException
+
+ /**
+ * Read 8 byte fixed length integer from byte buffers.
+ *
+ * @return 8 byte fixed length integer
+ */
+ def readInt8: Long = self.readLongLE
+
+ /**
+ * Write 8 byte fixed length integer to byte buffers.
+ *
+ * @param value 8 byte fixed length integer
+ */
+ def writeInt8(value: Long): ByteBuf = self.writeLongLE(value)
+
+ /**
+ * Read lenenc integer from byte buffers.
+ *
+ * @return lenenc integer
+ */
+ def readIntLenenc: Long = {
+ val firstByte = readInt1
+ if (firstByte < 0xfb) return firstByte
+ if (0xfb == firstByte) return 0
+ if (0xfc == firstByte) return readInt2
+ if (0xfd == firstByte) return readInt3
+ self.readLongLE
+ }
+
+ /**
+ * Write lenenc integer to byte buffers.
+ *
+ * @param value lenenc integer
+ */
+ def writeIntLenenc(value: Long): ByteBuf = {
+ if (value < 0xfb) {
+ self.writeByte(value.toInt)
+ } else if (value < (1 << 16)) {
+ self.writeByte(0xfc)
+ self.writeShortLE(value.toInt)
+ } else if (value < (1 << 24)) {
+ self.writeByte(0xfd)
+ self.writeMediumLE(value.toInt)
+ } else {
+ self.writeByte(0xfe)
+ self.writeLongLE(value)
+ }
+ }
+
+ /**
+ * Read fixed length long from byte buffers.
+ *
+ * @param length length read from byte buffers
+ * @return fixed length long
+ */
+ def readLong(length: Int): Long = {
+ var result = 0L
+ var i = 0
+ while (i < length) {
+ result = result << 8 | readInt1
+ i = i + 1
+ }
+ result
+ }
+
+ /**
+ * Read lenenc string from byte buffers.
+ *
+ * @return lenenc string
+ */
+ def readStringLenenc: String = {
+ val length = readIntLenenc.toInt
+ val result = new Array[Byte](length)
+ self.readBytes(result)
+ new String(result, charset)
+ }
+
+ /**
+ * Read lenenc string from byte buffers for bytes.
+ *
+ * @return lenenc bytes
+ */
+ def readStringLenencByBytes: Array[Byte] = {
+ val length = readIntLenenc.toInt
+ val result = new Array[Byte](length)
+ self.readBytes(result)
+ result
+ }
+
+ /**
+ * Write lenenc string to byte buffers.
+ *
+ * @param value fixed length string
+ */
+ def writeStringLenenc(value: String): ByteBuf = {
+ val bytes = value.getBytes(charset)
+ writeIntLenenc(bytes.length)
+ self.writeBytes(bytes)
+ }
+
+ /**
+ * Write lenenc bytes to byte buffers.
+ *
+ * @param value fixed length bytes
+ */
+ def writeBytesLenenc(value: Array[Byte]): ByteBuf = {
+ if (0 == value.length) {
+ self.writeByte(0)
+ return self
+ }
+ writeIntLenenc(value.length)
+ self.writeBytes(value)
+ }
+
+ /**
+ * Read fixed length string from byte buffers.
+ *
+ * @param length length of fixed string
+ * @return fixed length string
+ */
+ def readStringFix(length: Int): String = new
String(readStringFixByBytes(length), charset)
+
+ /**
+ * Read fixed length string from byte buffers and return bytes.
+ *
+ * @param length length of fixed string
+ * @return fixed length bytes
+ */
+ def readStringFixByBytes(length: Int): Array[Byte] = {
+ val result = new Array[Byte](length)
+ self.readBytes(result)
+ result
+ }
+
+ /**
+ * Write variable length string to byte buffers.
+ *
+ * @param value fixed length string
+ */
+ def writeStringFix(value: String): ByteBuf =
self.writeBytes(value.getBytes(charset))
+
+ /**
+ * Write variable length bytes to byte buffers.
+ *
+ * @param value fixed length bytes
+ */
+ def writeBytes(value: Array[Byte]): ByteBuf = self.writeBytes(value)
+
+ /**
+ * Read null terminated string from byte buffers.
+ *
+ * @return null terminated string
+ */
+ def readStringNul: String = new String(readStringNulByBytes, charset)
+
+ /**
+ * Read null terminated string from byte buffers and return bytes.
+ *
+ * @return null terminated bytes
+ */
+ def readStringNulByBytes: Array[Byte] = {
+ val result = new Array[Byte](self.bytesBefore(0.toByte))
+ self.readBytes(result)
+ self.skipBytes(1)
+ result
+ }
+
+ /**
+ * Write null terminated string to byte buffers.
+ *
+ * @param value null terminated string
+ */
+ def writeStringNul(value: String): ByteBuf = {
+ self.writeBytes(value.getBytes(charset))
+ self.writeByte(0)
+ }
+
+ /**
+ * Read rest of packet string from byte buffers and return bytes.
+ *
+ * @return rest of packet string bytes
+ */
+ def readStringEOFByBytes: Array[Byte] = {
+ val result = new Array[Byte](self.readableBytes)
+ self.readBytes(result)
+ result
+ }
+
+ /**
+ * Read rest of packet string from byte buffers.
+ *
+ * @return rest of packet string
+ */
+ def readStringEOF: String = {
+ val result = new Array[Byte](self.readableBytes)
+ self.readBytes(result)
+ new String(result, charset)
+ }
+
+ /**
+ * Write rest of packet string to byte buffers.
+ *
+ * @param value rest of packet string
+ */
+ def writeStringEOF(value: String): ByteBuf =
self.writeBytes(value.getBytes(charset))
+
+ /**
+ * Skip reserved from byte buffers.
+ *
+ * @param length length of reserved
+ */
+ def skipReserved(length: Int): ByteBuf = self.skipBytes(length)
+
+ /**
+ * Write null for reserved to byte buffers.
+ *
+ * @param length length of reserved
+ */
+ def writeReserved(length: Int): ByteBuf = self.writeZero(length)
+ }
+}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/authentication/MySQLAuthPackets.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/authentication/MySQLAuthPackets.scala
new file mode 100644
index 0000000..0e731bd
--- /dev/null
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/authentication/MySQLAuthPackets.scala
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.server.mysql.authentication
+
+import io.netty.buffer.ByteBuf
+
+import org.apache.kyuubi.server.mysql._
+import org.apache.kyuubi.server.mysql.MySQLRichByteBuf.Implicit
+import org.apache.kyuubi.server.mysql.constant._
+
+case class MySQLAuthSwitchRequestPacket(
+ sequenceId: Int,
+ authPluginName: String,
+ authPluginData: MySQLNativePassword.PluginData
+) extends MySQLPacket with SupportsEncode {
+
+ def header: Int = 0xfe
+
+ override def encode(payload: ByteBuf): Unit = {
+ payload.writeInt1(header)
+ payload.writeStringNul(authPluginName)
+ payload.writeStringNul(new String(authPluginData.full))
+ }
+}
+
+object MySQLAuthSwitchResponsePacket extends
SupportsDecode[MySQLAuthSwitchResponsePacket] {
+
+ override def decode(payload: ByteBuf): MySQLAuthSwitchResponsePacket = {
+ val _sequenceId = payload.readInt1
+ val _authPluginResponse = payload.readStringEOFByBytes
+ MySQLAuthSwitchResponsePacket(_sequenceId, _authPluginResponse)
+ }
+}
+
+case class MySQLAuthSwitchResponsePacket(
+ sequenceId: Int,
+ authPluginResponse: Array[Byte]
+) extends MySQLPacket
+
+case class MySQLHandshakePacket(
+ connectionId: Int,
+ authPluginData: MySQLNativePassword.PluginData
+) extends MySQLPacket with SupportsEncode {
+
+ def protocolVersion: Int = MySQLServerDefines.PROTOCOL_VERSION
+
+ def serverVersion: String = MySQLServerDefines.MYSQL_KYUUBI_SERVER_VERSION
+
+ def statusFlag: MySQLStatusFlag = MySQLStatusFlag.SERVER_STATUS_AUTOCOMMIT
+
+ def charset: Int = MySQLServerDefines.CHARSET
+
+ def capabilityFlagsLower: Int = MySQLCapabilityFlag.handshakeValueLower
+
+ def capabilityFlagsUpper: Int = MySQLCapabilityFlag.handshakeValueUpper
+
+ def authPluginName: String = MySQLAuthenticationMethod.NATIVE_PASSWORD.method
+
+ override def sequenceId: Int = 0
+
+ override def encode(payload: ByteBuf): Unit = {
+ payload.writeInt1(protocolVersion)
+ payload.writeStringNul(serverVersion)
+ payload.writeInt4(connectionId)
+ payload.writeStringNul(new String(authPluginData.part1))
+ payload.writeInt2(capabilityFlagsLower)
+ payload.writeInt1(charset)
+ payload.writeInt2(statusFlag.value)
+ payload.writeInt2(capabilityFlagsUpper)
+ payload.writeInt1(if (isClientPluginAuth) authPluginData.full.length + 1
else 0)
+ payload.writeReserved(10)
+ if (isClientSecureConnection) payload.writeStringNul(new
String(authPluginData.part2))
+ if (isClientPluginAuth) payload.writeStringNul(authPluginName)
+ }
+
+ private def isClientSecureConnection =
+ (capabilityFlagsLower & MySQLCapabilityFlag.CLIENT_SECURE_CONNECTION.value
& 0x00000ffff) != 0
+
+ private def isClientPluginAuth =
+ (capabilityFlagsUpper & MySQLCapabilityFlag.CLIENT_PLUGIN_AUTH.value >>
16) != 0
+}
+
+object MySQLHandshakeResponse41Packet extends
SupportsDecode[MySQLHandshakeResponse41Packet] {
+ override def decode(payload: ByteBuf): MySQLHandshakeResponse41Packet = {
+ val sequenceId = payload.readInt1
+ val capabilityFlags = payload.readInt4
+ val maxPacketSize = payload.readInt4
+ val characterSet = payload.readInt1
+ payload.skipReserved(23)
+ val username = payload.readStringNul
+ val authResponse = readAuthResponse(payload, capabilityFlags)
+ val database = readDatabase(payload, capabilityFlags)
+ val authPluginName = readAuthPluginName(payload, capabilityFlags)
+ MySQLHandshakeResponse41Packet(
+ sequenceId,
+ capabilityFlags,
+ maxPacketSize,
+ characterSet,
+ username,
+ authResponse,
+ database,
+ authPluginName)
+ }
+
+ private def readAuthResponse(payload: ByteBuf, capabilityFlags: Int):
Array[Byte] = {
+ if (0 != (capabilityFlags &
MySQLCapabilityFlag.CLIENT_PLUGIN_AUTH_LENENC_CLIENT_DATA.value)) {
+ return payload.readStringLenencByBytes
+ }
+ if (0 != (capabilityFlags &
MySQLCapabilityFlag.CLIENT_SECURE_CONNECTION.value)) {
+ val length = payload.readInt1
+ return payload.readStringFixByBytes(length)
+ }
+ payload.readStringNulByBytes
+ }
+
+ private def readDatabase(payload: ByteBuf, capabilityFlags: Int): String =
+ if (0 != (capabilityFlags &
MySQLCapabilityFlag.CLIENT_CONNECT_WITH_DB.value)) {
+ payload.readStringNul
+ } else null
+
+ private def readAuthPluginName(payload: ByteBuf, capabilityFlags: Int):
String =
+ if (0 != (capabilityFlags & MySQLCapabilityFlag.CLIENT_PLUGIN_AUTH.value))
payload.readStringNul
+ else null
+}
+
+case class MySQLHandshakeResponse41Packet(
+ sequenceId: Int,
+ capabilityFlags: Int,
+ maxPacketSize: Int,
+ characterSet: Int,
+ username: String,
+ authResponse: Array[Byte],
+ database: String,
+ authPluginName: String
+) extends MySQLPacket
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/authentication/MySQLAuthentication.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/authentication/MySQLAuthentication.scala
new file mode 100644
index 0000000..25c9f3f
--- /dev/null
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/authentication/MySQLAuthentication.scala
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.server.mysql.authentication
+
+import java.net.InetSocketAddress
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.util.Random
+
+import io.netty.buffer.ByteBuf
+import io.netty.channel.ChannelHandlerContext
+
+import org.apache.kyuubi.server.mysql._
+import org.apache.kyuubi.server.mysql.authentication.MySQLAuthentication._
+import org.apache.kyuubi.server.mysql.constant._
+
+object MySQLAuthentication {
+
+ private val seed: Array[Byte] = Array(
+ // format: off
+ 'a', 'b', 'e', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm',
+ 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z',
+ 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M',
+ 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z',
+ '0', '1', '2', '3', '4', '5', '6', '7', '8', '9')
+ // format: on
+
+ def randomBytes(length: Int): Array[Byte] = {
+ val result = new Array[Byte](length)
+ var i = 0
+ while (i < length) {
+ result(i) = seed(Random.nextInt(seed.length))
+ i = i + 1
+ }
+ result
+ }
+
+ final val connIdCounter = new AtomicInteger
+}
+
+object MySQLConnectionPhase extends Enumeration {
+ type MySQLConnectionPhase = Value
+
+ val INITIAL_HANDSHAKE, AUTH_PHASE_FAST_PATH, AUTHENTICATION_METHOD_MISMATCH
= Value
+}
+
+sealed abstract class MySQLAuthenticationMethod(val method: String)
+
+object MySQLAuthenticationMethod {
+
+ object OLD_PASSWORD extends MySQLAuthenticationMethod("mysql_old_password")
+
+ // Currently, it's the ONLY supported authentication method
+ object NATIVE_PASSWORD extends
MySQLAuthenticationMethod("mysql_native_password")
+
+ object CLEAR_TEXT extends MySQLAuthenticationMethod("mysql_clear_password")
+
+ object WINDOWS_NATIVE extends
MySQLAuthenticationMethod("authentication_windows_client")
+
+ object SHA256 extends MySQLAuthenticationMethod("sha256_password")
+}
+
+case class AuthenticationResult(
+ user: String,
+ ip: String,
+ database: String,
+ finished: Boolean
+)
+
+object AuthenticationResult {
+ def finished(username: String, ip: String, database: String):
AuthenticationResult =
+ new AuthenticationResult(username, ip, database, true)
+
+ def continued: AuthenticationResult =
+ new AuthenticationResult(null, null, null, false)
+
+ def continued(username: String, ip: String, database: String):
AuthenticationResult =
+ new AuthenticationResult(username, ip, database, false)
+}
+
+class MySQLAuthenticationEngine {
+ private final val authenticator = new MySQLNativePassword
+ private final val currentSeqId = new AtomicInteger
+ private var connectionPhase = MySQLConnectionPhase.INITIAL_HANDSHAKE
+ private var authResponse: Array[Byte] = _
+ private var authResult: AuthenticationResult = _
+
+ def handshake(ctx: ChannelHandlerContext): Int = {
+ val connectionId = connIdCounter.getAndIncrement
+ connectionPhase = MySQLConnectionPhase.AUTH_PHASE_FAST_PATH
+ ctx.writeAndFlush(MySQLHandshakePacket(connectionId,
authenticator.pluginData))
+ connectionId
+ }
+
+ def authenticate(ctx: ChannelHandlerContext, buf: ByteBuf):
AuthenticationResult = {
+ connectionPhase match {
+ case MySQLConnectionPhase.AUTHENTICATION_METHOD_MISMATCH =>
+ authenticationMethodMismatch(buf)
+ case MySQLConnectionPhase.AUTH_PHASE_FAST_PATH =>
+ authResult = authPhaseFastPath(ctx, buf)
+ if (!authResult.finished) return authResult
+ case _ => // never happen
+ }
+ val seqId = currentSeqId.incrementAndGet
+ val responsePacket = authenticator
+ .login(authResult.user, remoteAddress(ctx), authResponse,
authResult.database)
+ .map(createErrorPacket(ctx, _, seqId))
+ .getOrElse(MySQLOKPacket(seqId))
+ ctx.writeAndFlush(responsePacket)
+
+ AuthenticationResult.finished(authResult.user, remoteAddress(ctx),
authResult.database)
+ }
+
+ private def authPhaseFastPath(ctx: ChannelHandlerContext, buf: ByteBuf):
AuthenticationResult = {
+ val packet = MySQLHandshakeResponse41Packet.decode(buf)
+ authResponse = packet.authResponse
+ currentSeqId.set(packet.sequenceId)
+ // always switch to mysql_native_password since Kyuubi Server only support
this method
+ if (isClientPluginAuth(packet)
+ && packet.authPluginName !=
MySQLAuthenticationMethod.NATIVE_PASSWORD.method) {
+ connectionPhase = MySQLConnectionPhase.AUTHENTICATION_METHOD_MISMATCH
+ ctx.writeAndFlush(MySQLAuthSwitchRequestPacket(
+ currentSeqId.incrementAndGet,
+ MySQLAuthenticationMethod.NATIVE_PASSWORD.method,
+ authenticator.pluginData))
+ return AuthenticationResult.continued(packet.username,
remoteAddress(ctx), packet.database)
+ }
+ AuthenticationResult.finished(packet.username, remoteAddress(ctx),
packet.database)
+ }
+
+ private def isClientPluginAuth(packet: MySQLHandshakeResponse41Packet) =
+ (packet.capabilityFlags & MySQLCapabilityFlag.CLIENT_PLUGIN_AUTH.value) != 0
+
+ private def authenticationMethodMismatch(buf: ByteBuf): Unit = {
+ val packet = MySQLAuthSwitchResponsePacket.decode(buf)
+ currentSeqId.set(packet.sequenceId)
+ authResponse = packet.authPluginResponse
+ }
+
+ private def createErrorPacket(
+ ctx: ChannelHandlerContext,
+ errorCode: MySQLErrorCode,
+ seqId: Int
+ ): MySQLErrPacket = errorCode match {
+ case MySQLErrorCode.ER_DBACCESS_DENIED_ERROR => MySQLErrPacket(
+ seqId,
+ MySQLErrorCode.ER_DBACCESS_DENIED_ERROR,
+ authResult.user,
+ remoteAddress(ctx),
+ authResult.database)
+ case _ => MySQLErrPacket(
+ seqId,
+ MySQLErrorCode.ER_ACCESS_DENIED_ERROR,
+ authResult.user,
+ remoteAddress(ctx),
+ errorMessage)
+ }
+
+ private def errorMessage = if (authResponse.nonEmpty) "YES" else "NO"
+
+ private def remoteAddress(ctx: ChannelHandlerContext): String = {
+ ctx.channel.remoteAddress match {
+ case address: InetSocketAddress => address.getAddress.getHostAddress
+ case other => other.toString
+ }
+ }
+}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/authentication/MySQLNativePassword.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/authentication/MySQLNativePassword.scala
new file mode 100644
index 0000000..c238186
--- /dev/null
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/authentication/MySQLNativePassword.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.server.mysql.authentication
+
+import java.util
+
+import org.apache.commons.codec.digest.DigestUtils
+
+import
org.apache.kyuubi.server.mysql.authentication.MySQLAuthentication.randomBytes
+import
org.apache.kyuubi.server.mysql.authentication.MySQLNativePassword.PluginData
+import org.apache.kyuubi.server.mysql.constant.MySQLErrorCode
+
+object MySQLNativePassword {
+ case class PluginData(
+ part1: Array[Byte] = randomBytes(8),
+ part2: Array[Byte] = randomBytes(12)
+ ) {
+ lazy val full: Array[Byte] = Array.concat(part1, part2)
+ }
+}
+
+class MySQLNativePassword {
+ private final val _pluginData = new PluginData
+
+ def pluginData: PluginData = _pluginData
+
+ def login(
+ user: String,
+ host: String,
+ authResp: Array[Byte],
+ database: String
+ ): Option[MySQLErrorCode] = {
+ if (isPasswordRight("kyuubi", authResp)) {
+ // if (true) {
+ None
+ } else {
+ Some(MySQLErrorCode.ER_ACCESS_DENIED_ERROR)
+ }
+ }
+
+ private[authentication] def isPasswordRight(password: String,
authentication: Array[Byte]) =
+ util.Arrays.equals(getAuthCipherBytes(password), authentication)
+
+ private def getAuthCipherBytes(password: String): Array[Byte] = {
+ val salt = pluginData.full
+ val passwordSha1 = DigestUtils.sha1(password)
+ val passwordSha1Sha1 = DigestUtils.sha1(passwordSha1)
+ val secret = new Array[Byte](salt.length + passwordSha1Sha1.length)
+ System.arraycopy(salt, 0, secret, 0, salt.length)
+ System.arraycopy(passwordSha1Sha1, 0, secret, salt.length,
passwordSha1Sha1.length)
+ val secretSha1 = DigestUtils.sha1(secret)
+ xor(passwordSha1, secretSha1)
+ }
+
+ private def xor(input: Array[Byte], secret: Array[Byte]): Array[Byte] =
+ (input zip secret).map { case (b1, b2) => (b1 ^ b2).toByte }
+}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/constant/MySQLCapabilityFlag.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/constant/MySQLCapabilityFlag.scala
new file mode 100644
index 0000000..b3af5ea
--- /dev/null
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/constant/MySQLCapabilityFlag.scala
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.server.mysql.constant
+
+sealed abstract class MySQLCapabilityFlag(val value: Int)
+
+//
https://dev.mysql.com/doc/dev/mysql-server/latest/group__group__cs__capabilities__flags.html
+object MySQLCapabilityFlag {
+
+ object CLIENT_LONG_PASSWORD extends MySQLCapabilityFlag(0x00000001)
+
+ object CLIENT_FOUND_ROWS extends MySQLCapabilityFlag(0x00000002)
+
+ object CLIENT_LONG_FLAG extends MySQLCapabilityFlag(0x00000004)
+
+ object CLIENT_CONNECT_WITH_DB extends MySQLCapabilityFlag(0x00000008)
+
+ object CLIENT_NO_SCHEMA extends MySQLCapabilityFlag(0x00000010)
+
+ object CLIENT_COMPRESS extends MySQLCapabilityFlag(0x00000020)
+
+ object CLIENT_ODBC extends MySQLCapabilityFlag(0x00000040)
+
+ object CLIENT_LOCAL_FILES extends MySQLCapabilityFlag(0x00000080)
+
+ object CLIENT_IGNORE_SPACE extends MySQLCapabilityFlag(0x00000100)
+
+ object CLIENT_PROTOCOL_41 extends MySQLCapabilityFlag(0x00000200)
+
+ object CLIENT_INTERACTIVE extends MySQLCapabilityFlag(0x00000400)
+
+ object CLIENT_SSL extends MySQLCapabilityFlag(0x00000800)
+
+ object CLIENT_IGNORE_SIGPIPE extends MySQLCapabilityFlag(0x00001000)
+
+ object CLIENT_TRANSACTIONS extends MySQLCapabilityFlag(0x00002000)
+
+ object CLIENT_RESERVED extends MySQLCapabilityFlag(0x00004000)
+
+ object CLIENT_SECURE_CONNECTION extends MySQLCapabilityFlag(0x00008000)
+
+ object CLIENT_MULTI_STATEMENTS extends MySQLCapabilityFlag(0x00010000)
+
+ object CLIENT_MULTI_RESULTS extends MySQLCapabilityFlag(0x00020000)
+
+ object CLIENT_PS_MULTI_RESULTS extends MySQLCapabilityFlag(0x00040000)
+
+ object CLIENT_PLUGIN_AUTH extends MySQLCapabilityFlag(0x00080000)
+
+ object CLIENT_CONNECT_ATTRS extends MySQLCapabilityFlag(0x00100000)
+
+ object CLIENT_PLUGIN_AUTH_LENENC_CLIENT_DATA extends
MySQLCapabilityFlag(0x00200000)
+
+ object CLIENT_CAN_HANDLE_EXPIRED_PASSWORDS extends
MySQLCapabilityFlag(0x00400000)
+
+ object CLIENT_SESSION_TRACK extends MySQLCapabilityFlag(0x00800000)
+
+ // since MySQL 5.7.15, disable it for compatible
+ object CLIENT_DEPRECATE_EOF extends MySQLCapabilityFlag(0x01000000)
+
+ val handshakeValue: Int = calculateValues(
+ CLIENT_LONG_PASSWORD,
+ CLIENT_FOUND_ROWS,
+ CLIENT_LONG_FLAG,
+ CLIENT_CONNECT_WITH_DB,
+ CLIENT_ODBC,
+ CLIENT_IGNORE_SPACE,
+ CLIENT_PROTOCOL_41,
+ CLIENT_INTERACTIVE,
+ CLIENT_IGNORE_SIGPIPE,
+ CLIENT_TRANSACTIONS,
+ CLIENT_SECURE_CONNECTION,
+ CLIENT_PLUGIN_AUTH)
+
+ val handshakeValueLower: Int = handshakeValue & 0x0000ffff
+
+ val handshakeValueUpper: Int = handshakeValue >>> 16
+
+ private def calculateValues(capabilities: MySQLCapabilityFlag*): Int =
+ capabilities.foldLeft(0) { case (acc, item) => acc | item.value }
+}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/constant/MySQLCommandPacketType.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/constant/MySQLCommandPacketType.scala
new file mode 100644
index 0000000..d0466bb
--- /dev/null
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/constant/MySQLCommandPacketType.scala
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.server.mysql.constant
+
+sealed abstract class MySQLCommandPacketType(val value: Int)
+
+object MySQLCommandPacketType {
+
+ // https://dev.mysql.com/doc/internals/en/com-sleep.html
+ object COM_SLEEP extends MySQLCommandPacketType(0x00)
+
+ // https://dev.mysql.com/doc/internals/en/com-quit.html
+ object COM_QUIT extends MySQLCommandPacketType(0x01)
+
+ // https://dev.mysql.com/doc/internals/en/com-init-db.html
+ object COM_INIT_DB extends MySQLCommandPacketType(0x02)
+
+ // https://dev.mysql.com/doc/internals/en/com-query.html
+ object COM_QUERY extends MySQLCommandPacketType(0x03)
+
+ // https://dev.mysql.com/doc/internals/en/com-field-list.html
+ object COM_FIELD_LIST extends MySQLCommandPacketType(0x04)
+
+ // https://dev.mysql.com/doc/internals/en/com-create-db.html
+ object COM_CREATE_DB extends MySQLCommandPacketType(0x05)
+
+ // https://dev.mysql.com/doc/internals/en/com-drop-db.html
+ object COM_DROP_DB extends MySQLCommandPacketType(0x06)
+
+ // https://dev.mysql.com/doc/internals/en/com-refresh.html
+ object COM_REFRESH extends MySQLCommandPacketType(0x07)
+
+ // https://dev.mysql.com/doc/internals/en/com-shutdown.html
+ object COM_SHUTDOWN extends MySQLCommandPacketType(0x08)
+
+ // https://dev.mysql.com/doc/internals/en/com-statistics.html
+ object COM_STATISTICS extends MySQLCommandPacketType(0x09)
+
+ // https://dev.mysql.com/doc/internals/en/com-process-info.html
+ object COM_PROCESS_INFO extends MySQLCommandPacketType(0x0a)
+
+ // https://dev.mysql.com/doc/internals/en/com-connect.html
+ object COM_CONNECT extends MySQLCommandPacketType(0x0b)
+
+ // https://dev.mysql.com/doc/internals/en/com-process-kill.html
+ object COM_PROCESS_KILL extends MySQLCommandPacketType(0x0c)
+
+ // https://dev.mysql.com/doc/internals/en/com-debug.html
+ object COM_DEBUG extends MySQLCommandPacketType(0x0d)
+
+ // https://dev.mysql.com/doc/internals/en/com-ping.html
+ object COM_PING extends MySQLCommandPacketType(0x0e)
+
+ // https://dev.mysql.com/doc/internals/en/com-time.html
+ object COM_TIME extends MySQLCommandPacketType(0x0f)
+
+ // https://dev.mysql.com/doc/internals/en/com-delayed-insert.html
+ object COM_DELAYED_INSERT extends MySQLCommandPacketType(0x10)
+
+ // https://dev.mysql.com/doc/internals/en/com-change-user.html
+ object COM_CHANGE_USER extends MySQLCommandPacketType(0x11)
+
+ // https://dev.mysql.com/doc/internals/en/com-binlog-dump.html
+ object COM_BINLOG_DUMP extends MySQLCommandPacketType(0x12)
+
+ // https://dev.mysql.com/doc/internals/en/com-table-dump.html
+ object COM_TABLE_DUMP extends MySQLCommandPacketType(0x13)
+
+ // https://dev.mysql.com/doc/internals/en/com-connect-out.html
+ object COM_CONNECT_OUT extends MySQLCommandPacketType(0x14)
+
+ // https://dev.mysql.com/doc/internals/en/com-register-slave.html
+ object COM_REGISTER_SLAVE extends MySQLCommandPacketType(0x15)
+
+ // https://dev.mysql.com/doc/internals/en/com-stmt-prepare.html
+ object COM_STMT_PREPARE extends MySQLCommandPacketType(0x16)
+
+ // https://dev.mysql.com/doc/internals/en/com-stmt-execute.html
+ object COM_STMT_EXECUTE extends MySQLCommandPacketType(0x17)
+
+ // https://dev.mysql.com/doc/internals/en/com-stmt-send-long-data.html
+ object COM_STMT_SEND_LONG_DATA extends MySQLCommandPacketType(0x18)
+
+ // https://dev.mysql.com/doc/internals/en/com-stmt-close.html
+ object COM_STMT_CLOSE extends MySQLCommandPacketType(0x19)
+
+ // https://dev.mysql.com/doc/internals/en/com-stmt-reset.html
+ object COM_STMT_RESET extends MySQLCommandPacketType(0x1a)
+
+ // https://dev.mysql.com/doc/internals/en/com-set-option.html
+ object COM_SET_OPTION extends MySQLCommandPacketType(0x1b)
+
+ // https://dev.mysql.com/doc/internals/en/com-stmt-fetch.html
+ object COM_STMT_FETCH extends MySQLCommandPacketType(0x1c)
+
+ // https://dev.mysql.com/doc/internals/en/com-daemon.html
+ object COM_DAEMON extends MySQLCommandPacketType(0x1d)
+
+ // https://dev.mysql.com/doc/internals/en/com-binlog-dump-gtid.html
+ object COM_BINLOG_DUMP_GTID extends MySQLCommandPacketType(0x1e)
+
+ // https://dev.mysql.com/doc/internals/en/com-reset-connection.html
+ object COM_RESET_CONNECTION extends MySQLCommandPacketType(0x1f)
+
+ def valueOf(value: Int): MySQLCommandPacketType = value match {
+ case COM_SLEEP.value => COM_SLEEP
+ case COM_QUIT.value => COM_QUIT
+ case COM_INIT_DB.value => COM_INIT_DB
+ case COM_QUERY.value => COM_QUERY
+ case COM_FIELD_LIST.value => COM_FIELD_LIST
+ case COM_CREATE_DB.value => COM_CREATE_DB
+ case COM_DROP_DB.value => COM_DROP_DB
+ case COM_REFRESH.value => COM_REFRESH
+ case COM_SHUTDOWN.value => COM_SHUTDOWN
+ case COM_STATISTICS.value => COM_STATISTICS
+ case COM_PROCESS_INFO.value => COM_PROCESS_INFO
+ case COM_CONNECT.value => COM_CONNECT
+ case COM_PROCESS_KILL.value => COM_PROCESS_KILL
+ case COM_DEBUG.value => COM_DEBUG
+ case COM_PING.value => COM_PING
+ case COM_TIME.value => COM_TIME
+ case COM_DELAYED_INSERT.value => COM_DELAYED_INSERT
+ case COM_CHANGE_USER.value => COM_CHANGE_USER
+ case COM_BINLOG_DUMP.value => COM_BINLOG_DUMP
+ case COM_TABLE_DUMP.value => COM_TABLE_DUMP
+ case COM_CONNECT_OUT.value => COM_CONNECT_OUT
+ case COM_REGISTER_SLAVE.value => COM_REGISTER_SLAVE
+ case COM_STMT_PREPARE.value => COM_STMT_PREPARE
+ case COM_STMT_EXECUTE.value => COM_STMT_EXECUTE
+ case COM_STMT_SEND_LONG_DATA.value => COM_STMT_SEND_LONG_DATA
+ case COM_STMT_CLOSE.value => COM_STMT_CLOSE
+ case COM_STMT_RESET.value => COM_STMT_RESET
+ case COM_SET_OPTION.value => COM_SET_OPTION
+ case COM_STMT_FETCH.value => COM_STMT_FETCH
+ case COM_DAEMON.value => COM_DAEMON
+ case COM_BINLOG_DUMP_GTID.value => COM_BINLOG_DUMP_GTID
+ case COM_RESET_CONNECTION.value => COM_RESET_CONNECTION
+ }
+}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/constant/MySQLDataType.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/constant/MySQLDataType.scala
new file mode 100644
index 0000000..7a76b47
--- /dev/null
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/constant/MySQLDataType.scala
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.server.mysql.constant
+
+import java.sql.Types
+
+import org.apache.hive.service.rpc.thrift.TTypeId
+
+sealed abstract class MySQLDataType(val value: Int)
+
+// https://dev.mysql.com/doc/internals/en/com-query-response.html#column-type
+object MySQLDataType {
+ object DECIMAL extends MySQLDataType(0x00)
+
+ object TINY extends MySQLDataType(0x01)
+
+ object SHORT extends MySQLDataType(0x02)
+
+ object LONG extends MySQLDataType(0x03)
+
+ object FLOAT extends MySQLDataType(0x04)
+
+ object DOUBLE extends MySQLDataType(0x05)
+
+ object NULL extends MySQLDataType(0x06)
+
+ object TIMESTAMP extends MySQLDataType(0x07)
+
+ object LONGLONG extends MySQLDataType(0x08)
+
+ object INT24 extends MySQLDataType(0x09)
+
+ object DATE extends MySQLDataType(0x0a)
+
+ object TIME extends MySQLDataType(0x0b)
+
+ object DATETIME extends MySQLDataType(0x0c)
+
+ object YEAR extends MySQLDataType(0x0d)
+
+ // Internal to MySQL Server
+ object NEWDATE extends MySQLDataType(0x0e)
+
+ object VARCHAR extends MySQLDataType(0x0f)
+
+ object BIT extends MySQLDataType(0x10)
+
+ // Internal to MySQL Server
+ object TIMESTAMP2 extends MySQLDataType(0x11)
+
+ // Internal to MySQL Server
+ object DATETIME2 extends MySQLDataType(0x12)
+
+ // Internal to MySQL Server
+ object TIME2 extends MySQLDataType(0x13)
+
+ // Do not describe in document, but actual exist.
+ // https://github.com/apache/shardingsphere/issues/4795
+ object JSON extends MySQLDataType(0xf5)
+
+ object NEWDECIMAL extends MySQLDataType(0xf6)
+
+ object ENUM extends MySQLDataType(0xf7)
+
+ object SET extends MySQLDataType(0xf8)
+
+ object TINY_BLOB extends MySQLDataType(0xf9)
+
+ object MEDIUM_BLOB extends MySQLDataType(0xfa)
+
+ object LONG_BLOB extends MySQLDataType(0xfb)
+
+ object BLOB extends MySQLDataType(0xfc)
+
+ object VAR_STRING extends MySQLDataType(0xfd)
+
+ object STRING extends MySQLDataType(0xfe)
+
+ object GEOMETRY extends MySQLDataType(0xff)
+
+ def valueOf(value: Int): MySQLDataType = value match {
+ case 0x00 => DECIMAL
+ case 0x01 => TINY
+ case 0x02 => SHORT
+ case 0x03 => LONG
+ case 0x04 => FLOAT
+ case 0x05 => DOUBLE
+ case 0x06 => NULL
+ case 0x07 => TIMESTAMP
+ case 0x08 => LONGLONG
+ case 0x09 => INT24
+ case 0x0a => DATE
+ case 0x0b => TIME
+ case 0x0c => DATETIME
+ case 0x0d => YEAR
+ case 0x0e => NEWDATE
+ case 0x0f => VARCHAR
+ case 0x10 => BIT
+ case 0x11 => TIMESTAMP2
+ case 0x12 => DATETIME2
+ case 0x13 => TIME2
+ case 0xf5 => JSON
+ case 0xf6 => NEWDECIMAL
+ case 0xf7 => ENUM
+ case 0xf8 => SET
+ case 0xf9 => TINY_BLOB
+ case 0xfa => MEDIUM_BLOB
+ case 0xfb => LONG_BLOB
+ case 0xfc => BLOB
+ case 0xfd => VAR_STRING
+ case 0xfe => STRING
+ case 0xff => GEOMETRY
+ case other => throw new IllegalArgumentException(
+ s"Illegal value $other of MySQLDataType")
+ }
+
+ def valueOfJdbcType(jdbcValue: Int): MySQLDataType = jdbcValue match {
+ case Types.BIT => BIT
+ case Types.TINYINT => TINY
+ case Types.SMALLINT => SHORT
+ case Types.INTEGER => LONG
+ case Types.BIGINT => LONGLONG
+ case Types.FLOAT => FLOAT
+ case Types.REAL => FLOAT
+ case Types.DOUBLE => DOUBLE
+ case Types.NUMERIC => NEWDECIMAL
+ case Types.DECIMAL => NEWDECIMAL
+ case Types.CHAR => STRING
+ case Types.VARCHAR => VAR_STRING
+ case Types.LONGVARCHAR => VAR_STRING
+ case Types.DATE => DATE
+ case Types.TIME => TIME
+ case Types.TIMESTAMP => TIMESTAMP
+ case Types.BINARY => STRING
+ case Types.VARBINARY => VAR_STRING
+ case Types.LONGVARBINARY => VAR_STRING
+ case Types.NULL => NULL
+ case Types.BLOB => BLOB
+ case other => throw new IllegalArgumentException(
+ s"Illegal JDBC type value $other of MySQLDataType")
+ }
+
+ def valueOfThriftType(tType: TTypeId): MySQLDataType = tType match {
+ case TTypeId.BOOLEAN_TYPE => TINY
+ case TTypeId.TINYINT_TYPE => TINY
+ case TTypeId.SMALLINT_TYPE => SHORT
+ case TTypeId.INT_TYPE => LONG
+ case TTypeId.BIGINT_TYPE => LONGLONG
+ case TTypeId.FLOAT_TYPE => FLOAT
+ case TTypeId.DOUBLE_TYPE => DOUBLE
+ case TTypeId.STRING_TYPE => VAR_STRING
+ case TTypeId.TIMESTAMP_TYPE => TIMESTAMP
+ case TTypeId.BINARY_TYPE => STRING
+ case TTypeId.ARRAY_TYPE => VAR_STRING // not exactly match, fallback
+ case TTypeId.MAP_TYPE => VAR_STRING // not exactly match, fallback
+ case TTypeId.STRUCT_TYPE => VAR_STRING // not exactly match, fallback
+ case TTypeId.UNION_TYPE => VAR_STRING // not exactly match, fallback
+ case TTypeId.USER_DEFINED_TYPE => VAR_STRING // not exactly match, fallback
+ case TTypeId.DECIMAL_TYPE => NEWDECIMAL
+ case TTypeId.NULL_TYPE => NULL
+ case TTypeId.DATE_TYPE => DATE
+ case TTypeId.VARCHAR_TYPE => VAR_STRING
+ case TTypeId.CHAR_TYPE => STRING
+ case TTypeId.INTERVAL_YEAR_MONTH_TYPE => VAR_STRING // not exactly match,
fallback
+ case TTypeId.INTERVAL_DAY_TIME_TYPE => VAR_STRING // not exactly match,
fallback
+ }
+}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/constant/MySQLErrorCode.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/constant/MySQLErrorCode.scala
new file mode 100644
index 0000000..a6c35b1
--- /dev/null
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/constant/MySQLErrorCode.scala
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.server.mysql.constant
+
+import org.apache.kyuubi.KyuubiSQLException
+
+case class MySQLErrorCode(errorCode: Int, sqlState: String, errorMessage:
String) {
+ override def toString: String = s"ERROR $errorCode ($sqlState):
$errorMessage"
+
+ def toKyuubiSQLException: KyuubiSQLException = {
+ new KyuubiSQLException(errorMessage, sqlState, errorCode, null)
+ }
+}
+
+object MySQLErrorCode {
+
+ object TOO_MANY_CONNECTIONS_EXCEPTION extends MySQLErrorCode(
+ 1040,
+ "08004",
+ "Too many connections")
+
+ object RUNTIME_EXCEPTION extends MySQLErrorCode(
+ 1997,
+ "C1997",
+ "Runtime exception: %s")
+
+ object UNSUPPORTED_COMMAND extends MySQLErrorCode(
+ 1998,
+ "C1998",
+ "Unsupported command: %s")
+
+ object UNKNOWN_EXCEPTION extends MySQLErrorCode(
+ 1999,
+ "C1999",
+ "Unknown exception: %s")
+
+ object ER_DBACCESS_DENIED_ERROR extends MySQLErrorCode(
+ 1044,
+ "42000",
+ "Access denied for user '%s'@'%s' to database '%s'")
+
+ object ER_ACCESS_DENIED_ERROR extends MySQLErrorCode(
+ 1045,
+ "28000",
+ "Access denied for user '%s'@'%s' (using password: %s)")
+
+ object ER_NO_DB_ERROR extends MySQLErrorCode(
+ 1046,
+ "3D000",
+ "No database selected")
+
+ object ER_BAD_DB_ERROR extends MySQLErrorCode(
+ 1049,
+ "42000",
+ "Unknown database '%s'")
+
+ object ER_INTERNAL_ERROR extends MySQLErrorCode(
+ 1815,
+ "HY000",
+ "Internal error: %s")
+
+ object ER_UNSUPPORTED_PS extends MySQLErrorCode(
+ 1295,
+ "HY000",
+ "This command is not supported in the prepared statement protocol yet")
+
+ object ER_DB_CREATE_EXISTS_ERROR extends MySQLErrorCode(
+ 1007,
+ "HY000",
+ "Can't create database '%s'; database exists")
+
+ object ER_DB_DROP_EXISTS_ERROR extends MySQLErrorCode(
+ 1008,
+ "HY000",
+ "Can't drop database '%s'; database doesn't exist")
+
+ object ER_TABLE_EXISTS_ERROR extends MySQLErrorCode(
+ 1050,
+ "42S01",
+ "Table '%s' already exists")
+
+ object ER_NO_SUCH_TABLE extends MySQLErrorCode(
+ 1146,
+ "42S02",
+ "Table '%s' doesn't exist")
+
+ object ER_NOT_SUPPORTED_YET extends MySQLErrorCode(
+ 1235,
+ "42000",
+ "This version of Kyuubi-Server doesn't yet support this SQL. '%s'")
+}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/constant/MySQLFieldDetailFlag.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/constant/MySQLFieldDetailFlag.scala
new file mode 100644
index 0000000..c1694ae
--- /dev/null
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/constant/MySQLFieldDetailFlag.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.server.mysql.constant
+
+sealed abstract class MySQLFieldDetailFlag(val value: Int)
+
+// https://mariadb.com/kb/en/library/resultset/#field-detail-flag
+object MySQLFieldDetailFlag {
+
+ object NOT_NULL extends MySQLFieldDetailFlag(0x00000001)
+
+ object PRIMARY_KEY extends MySQLFieldDetailFlag(0x00000002)
+
+ object UNIQUE_KEY extends MySQLFieldDetailFlag(0x00000004)
+
+ object MULTIPLE_KEY extends MySQLFieldDetailFlag(0x00000008)
+
+ object BLOB extends MySQLFieldDetailFlag(0x00000010)
+
+ object UNSIGNED extends MySQLFieldDetailFlag(0x00000020)
+
+ object ZEROFILL_FLAG extends MySQLFieldDetailFlag(0x00000040)
+
+ object BINARY_COLLATION extends MySQLFieldDetailFlag(0x00000080)
+
+ object ENUM extends MySQLFieldDetailFlag(0x00000100)
+
+ object AUTO_INCREMENT extends MySQLFieldDetailFlag(0x00000200)
+
+ object TIMESTAMP extends MySQLFieldDetailFlag(0x00000400)
+
+ object SET extends MySQLFieldDetailFlag(0x00000800)
+
+ object NO_DEFAULT_VALUE_FLAG extends MySQLFieldDetailFlag(0x00001000)
+
+ object ON_UPDATE_NOW_FLAG extends MySQLFieldDetailFlag(0x00002000)
+
+ object NUM_FLAG extends MySQLFieldDetailFlag(0x00008000)
+}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/constant/MySQLServerDefines.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/constant/MySQLServerDefines.scala
new file mode 100644
index 0000000..6c57254
--- /dev/null
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/constant/MySQLServerDefines.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.server.mysql.constant
+
+import org.apache.kyuubi._
+
+object MySQLServerDefines {
+ val PROTOCOL_VERSION = 0x0a
+ val CHARSET = 0x2d // utf8mb4_general_ci
+ val MYSQL_VERSION = "5.7.22"
+ val MYSQL_KYUUBI_SERVER_VERSION = s"$MYSQL_VERSION-Kyuubi-Server
$KYUUBI_VERSION"
+ val KYUUBI_SERVER_DESCRIPTION = s"Apache Kyuubi (Incubating)
v$KYUUBI_VERSION revision $REVISION"
+}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/constant/MySQLStatusFlag.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/constant/MySQLStatusFlag.scala
new file mode 100644
index 0000000..b3eb7c6
--- /dev/null
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/constant/MySQLStatusFlag.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.server.mysql.constant
+
+sealed abstract class MySQLStatusFlag(val value: Int)
+
+//
https://dev.mysql.com/doc/internals/en/status-flags.html#packet-Protocol::StatusFlags
+object MySQLStatusFlag {
+
+ object SERVER_STATUS_IN_TRANS extends MySQLStatusFlag(0x0001)
+
+ object SERVER_STATUS_AUTOCOMMIT extends MySQLStatusFlag(0x0002)
+
+ object SERVER_MORE_RESULTS_EXISTS extends MySQLStatusFlag(0x0008)
+
+ object SERVER_STATUS_NO_GOOD_INDEX_USED extends MySQLStatusFlag(0x0010)
+
+ object SERVER_STATUS_NO_INDEX_USED extends MySQLStatusFlag(0x0020)
+
+ object SERVER_STATUS_CURSOR_EXISTS extends MySQLStatusFlag(0x0040)
+
+ object SERVER_STATUS_LAST_ROW_SENT extends MySQLStatusFlag(0x0080)
+
+ object SERVER_STATUS_DB_DROPPED extends MySQLStatusFlag(0x0100)
+
+ object SERVER_STATUS_NO_BACKSLASH_ESCAPES extends MySQLStatusFlag(0x0200)
+
+ object SERVER_STATUS_METADATA_CHANGED extends MySQLStatusFlag(0x0400)
+
+ object SERVER_QUERY_WAS_SLOW extends MySQLStatusFlag(0x0800)
+
+ object SERVER_PS_OUT_PARAMS extends MySQLStatusFlag(0x1000)
+
+ object SERVER_STATUS_IN_TRANS_READONLY extends MySQLStatusFlag(0x2000)
+
+ object SERVER_SESSION_STATE_CHANGED extends MySQLStatusFlag(0x4000)
+}
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/mysql/MySQLCodecHelper.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/mysql/MySQLCodecHelper.scala
new file mode 100644
index 0000000..4b40be8
--- /dev/null
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/mysql/MySQLCodecHelper.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.server.mysql
+
+import io.netty.buffer.{ByteBuf, ByteBufUtil, Unpooled}
+
+import org.apache.kyuubi.KyuubiFunSuite
+
+trait MySQLCodecHelper extends KyuubiFunSuite {
+
+ def decodeHex(hexDump: String): ByteBuf = {
+ val compact = hexDump.replaceAll("(?s)\\s", "")
+ val bytes = ByteBufUtil.decodeHexDump(compact)
+ Unpooled.copiedBuffer(bytes)
+ }
+
+ def verifyDecode[T <: MySQLPacket](
+ decoder: SupportsDecode[T],
+ payload: ByteBuf,
+ expected: T
+ )(assertion: (T, T) => Unit): Unit = {
+ val decoded = decoder.decode(payload)
+ assertion(decoded, expected)
+ }
+
+ def verifyEncode(expected: ByteBuf, packet: SupportsEncode): Unit = {
+ val encoded = Unpooled.buffer()
+ packet.encode(encoded)
+ assert(encoded === expected)
+ }
+}
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/mysql/MySQLCommandPacketSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/mysql/MySQLCommandPacketSuite.scala
new file mode 100644
index 0000000..3cd6f26
--- /dev/null
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/mysql/MySQLCommandPacketSuite.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.server.mysql
+
+import org.apache.kyuubi.KyuubiFunSuite
+
+class MySQLCommandPacketSuite extends KyuubiFunSuite with MySQLCodecHelper {
+
+ test("decode MySQLComInitDbPacket") {
+ val payload = decodeHex("6b 79 75 75 62 69")
+ val expected = MySQLComInitDbPacket("kyuubi")
+ verifyDecode(MySQLComInitDbPacket, payload, expected) { (decoded,
expected) =>
+ assert(decoded === expected)
+ }
+ }
+
+ test("decode MySQLComFieldListPacket") {
+ val payload = decodeHex("6b 79 75 75 62 69 00 2a")
+ val expected = MySQLComFieldListPacket("kyuubi", "*")
+ verifyDecode(MySQLComFieldListPacket, payload, expected) { (decoded,
expected) =>
+ assert(decoded === expected)
+ }
+ }
+
+ test("decode MySQLComQueryPacket") {
+ val payload = decodeHex(
+ """73 65 6c 65 63 74 20 6b 79 75 75 62 69 5f 76 65
+ |72 73 69 6f 6e 28 29
+ |""".stripMargin)
+ val expected = MySQLComQueryPacket("select kyuubi_version()")
+ verifyDecode(MySQLComQueryPacket, payload, expected) { (decoded, expected)
=>
+ assert(decoded === expected)
+ }
+ }
+}
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/mysql/MySQLDataPacketSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/mysql/MySQLDataPacketSuite.scala
new file mode 100644
index 0000000..2c23b31
--- /dev/null
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/mysql/MySQLDataPacketSuite.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.server.mysql
+
+import org.apache.kyuubi.KyuubiFunSuite
+import org.apache.kyuubi.server.mysql.constant.MySQLDataType
+
+class MySQLDataPacketSuite extends KyuubiFunSuite with MySQLCodecHelper {
+
+ test("encode MySQLFieldCountPacket") {
+ val packet = MySQLFieldCountPacket(1, 1)
+ val expected = decodeHex("01")
+ verifyEncode(expected, packet)
+ }
+
+ test("encode MySQLColumnDefinition41Packet") {
+ val packet = MySQLColumnDefinition41Packet(1, 0, "UDF()", 100,
MySQLDataType.VAR_STRING, 0)
+ val expected = decodeHex(
+ """00 00 00 00 05 55 44 46 28 29 00 0c 2d 00 64 00
+ |00 00 fd 00 00 00 00 00
+ |""".stripMargin)
+ verifyEncode(expected, packet)
+ }
+
+ test("encode MySQLTextResultSetRowPacket") {
+ val packet = MySQLTextResultSetRowPacket(2, Seq("1.4.0-SNAPSHOT"))
+ val expected = decodeHex("0e 31 2e 34 2e 30 2d 53 4e 41 50 53 48 4f 54")
+ verifyEncode(expected, packet)
+ }
+}
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/mysql/MySQLGenericPacketSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/mysql/MySQLGenericPacketSuite.scala
new file mode 100644
index 0000000..5a463f3
--- /dev/null
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/mysql/MySQLGenericPacketSuite.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.server.mysql
+
+import org.apache.kyuubi.KyuubiFunSuite
+import org.apache.kyuubi.server.mysql.constant.MySQLErrorCode
+
+class MySQLGenericPacketSuite extends KyuubiFunSuite with MySQLCodecHelper {
+
+ test("encode MySQLOKPacket") {
+ val packet = MySQLOKPacket(1, 2, 3)
+ val expected = decodeHex("00 02 03 02 00 00 00")
+ verifyEncode(expected, packet)
+ }
+
+ test("encode MySQLErrPacket") {
+ val packet = MySQLErrPacket(1,
MySQLErrorCode.TOO_MANY_CONNECTIONS_EXCEPTION)
+ val expected = decodeHex(
+ """ff 10 04 23 30 38 30 30 34 54 6f 6f 20 6d 61 6e
+ |79 20 63 6f 6e 6e 65 63 74 69 6f 6e 73
+ |""".stripMargin)
+ verifyEncode(expected, packet)
+ }
+
+ test("encode MySQLEofPacket") {
+ val packet = MySQLEofPacket()
+ val expected = decodeHex("fe 00 00 02 00")
+ verifyEncode(expected, packet)
+ }
+}
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/mysql/authentication/MySQLAuthPacketSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/mysql/authentication/MySQLAuthPacketSuite.scala
new file mode 100644
index 0000000..7379eff
--- /dev/null
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/mysql/authentication/MySQLAuthPacketSuite.scala
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.server.mysql.authentication
+
+import java.util
+
+import org.apache.kyuubi.KyuubiFunSuite
+import org.apache.kyuubi.server.mysql.MySQLCodecHelper
+
+class MySQLAuthPacketSuite extends KyuubiFunSuite with MySQLCodecHelper {
+
+ private val authPluginData = {
+ val part1 = decodeHex("77 37 34 35 45 51 55 65").array
+ val part2 = decodeHex("69 44 57 32 44 44 33 4e 6d 36 69 74").array
+ MySQLNativePassword.PluginData(part1, part2)
+ }
+
+ test("encode MySQLHandshakePacket") {
+ val packet = MySQLHandshakePacket(2, authPluginData)
+ val expected = decodeHex(
+ """0a 35 2e 37 2e 32 32 2d 4b 79 75 75 62 69 2d 53
+ |65 72 76 65 72 20 31 2e 34 2e 30 2d 53 4e 41 50
+ |53 48 4f 54 00 02 00 00 00 77 37 34 35 45 51 55
+ |65 00 4f b7 2d 02 00 08 00 15 00 00 00 00 00 00
+ |00 00 00 00 69 44 57 32 44 44 33 4e 6d 36 69 74
+ |00 6d 79 73 71 6c 5f 6e 61 74 69 76 65 5f 70 61
+ |73 73 77 6f 72 64 00
+ |""".stripMargin)
+ verifyEncode(expected, packet)
+ }
+
+ test("decode MySQLHandshakeResponse41Packet") {
+ val payload = decodeHex(
+ """01 85 a6 ff 19 00 00 00 01 2d 00 00 00 00 00 00
+ |00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
+ |00 63 68 65 6e 67 70 61 6e 00 14 15 c6 82 8e 53
+ |67 20 3a 44 f3 d1 3e 62 f8 2d 20 38 3c 75 94 6d
+ |79 73 71 6c 5f 6e 61 74 69 76 65 5f 70 61 73 73
+ |77 6f 72 64 00
+ |""".stripMargin)
+ val expected = MySQLHandshakeResponse41Packet(
+ 1,
+ 0x19ffa685,
+ 16777216,
+ 0x2d,
+ "chengpan",
+ decodeHex(
+ """15 c6 82 8e 53 67 20 3a 44 f3 d1 3e 62 f8 2d 20
+ |38 3c 75 94
+ |""".stripMargin).array,
+ null,
+ "mysql_native_password")
+ verifyDecode(MySQLHandshakeResponse41Packet, payload, expected) {
(decoded, expected) =>
+ assert(decoded.sequenceId === expected.sequenceId)
+ assert(decoded.capabilityFlags === expected.capabilityFlags)
+ assert(decoded.maxPacketSize === expected.maxPacketSize)
+ assert(decoded.characterSet === expected.characterSet)
+ assert(decoded.username === expected.username)
+ assert(util.Arrays.equals(decoded.authResponse, expected.authResponse))
+ assert(decoded.database === expected.database)
+ assert(decoded.authPluginName === expected.authPluginName)
+ }
+ }
+
+ test("encode MySQLAuthSwitchRequestPacket") {
+ val packet = MySQLAuthSwitchRequestPacket(
+ 1,
+ MySQLAuthenticationMethod.NATIVE_PASSWORD.method,
+ authPluginData)
+ val expected = decodeHex(
+ """fe 6d 79 73 71 6c 5f 6e 61 74 69 76 65 5f 70 61
+ |73 73 77 6f 72 64 00 77 37 34 35 45 51 55 65 69
+ |44 57 32 44 44 33 4e 6d 36 69 74 00
+ |""".stripMargin)
+ verifyEncode(expected, packet)
+ }
+
+ test("decode MySQLAuthSwitchResponsePacket") {
+ val payloadHex = decodeHex(
+ """14 00 00 03 f4 17 96 1f 79 f3 ac 10 0b da a6 b3
+ |b5 c2 0e ab 59 85 ff b8
+ |""".stripMargin)
+ val expectedAuthPluginResponse = decodeHex(
+ """00 00 03 f4 17 96 1f 79 f3 ac 10 0b da a6 b3 b5
+ |c2 0e ab 59 85 ff b8
+ |""".stripMargin).array
+
+ verifyDecode(
+ MySQLAuthSwitchResponsePacket,
+ payloadHex,
+ MySQLAuthSwitchResponsePacket(20, expectedAuthPluginResponse)) {
(decoded, expected) =>
+ assert(decoded.sequenceId === expected.sequenceId)
+ assert(util.Arrays.equals(decoded.authPluginResponse,
expected.authPluginResponse))
+ }
+ }
+}