This is an automated email from the ASF dual-hosted git repository.
bowenliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 09febce0d [KYUUBI #5861] Generalize TRowSet generator for JDBC engine
with dialects
09febce0d is described below
commit 09febce0d8ac4f5297c2549f5f4254d1921b9b76
Author: Bowen Liang <[email protected]>
AuthorDate: Mon Dec 18 21:23:11 2023 +0800
[KYUUBI #5861] Generalize TRowSet generator for JDBC engine with dialects
# :mag: Description
## Issue References ๐
As described.
## Describe Your Solution ๐ง
- Introduced JdbcTRowSetGenerator extending `AbstractTRowSetGenerator `
introduced in #5851 in JDBC engine.
- Provide a DefaultJdbcTRowSetGenerator as default implementation for
mapping the JDBC data types to TRowSet generation
- Make JDBC dialect providing TRowSetGenerator extending
DefaultJdbcTRowSetGenerator to adapt detailed differences
## Types of changes :bookmark:
- [ ] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing
functionality to change)
## Test Plan ๐งช
#### Behavior Without This Pull Request :coffin:
#### Behavior With This Pull Request :tada:
#### Related Unit Tests
---
# Checklists
## ๐ Author Self Checklist
- [x] My code follows the [style
guidelines](https://kyuubi.readthedocs.io/en/master/contributing/code/style.html)
of this project
- [x] I have performed a self-review
- [x] I have commented my code, particularly in hard-to-understand areas
- [x] I have made corresponding changes to the documentation
- [ ] My changes generate no new warnings
- [ ] I have added tests that prove my fix is effective or that my feature
works
- [x] New and existing unit tests pass locally with my changes
- [x] This patch was not authored or co-authored using [Generative
Tooling](https://www.apache.org/legal/generative-tooling.html)
## ๐ Committer Pre-Merge Checklist
- [ ] Pull request title is okay.
- [ ] No license issues.
- [ ] Milestone correctly set?
- [ ] Test coverage is ok
- [ ] Assignees are selected.
- [ ] Minimum number of approvals
- [ ] No changes are requested
**Be nice. Be informative.**
Closes #5861 from bowenliang123/jdbc-rowgen.
Closes #5861
7f8658df1 [Bowen Liang] generalize jdbc TRowSet generator
Authored-by: Bowen Liang <[email protected]>
Signed-off-by: liangbowen <[email protected]>
---
.../kyuubi/engine/jdbc/dialect/DorisDialect.scala | 8 +-
.../kyuubi/engine/jdbc/dialect/JdbcDialect.scala | 4 +-
.../kyuubi/engine/jdbc/dialect/MySQLDialect.scala | 8 +-
.../engine/jdbc/dialect/PhoenixDialect.scala | 8 +-
.../engine/jdbc/dialect/PostgreSQLDialect.scala | 8 +-
...SetHelper.scala => DorisTRowSetGenerator.scala} | 14 +-
...SetHelper.scala => MySQLTRowSetGenerator.scala} | 28 +-
.../engine/jdbc/operation/JdbcOperation.scala | 7 +-
...lper.scala => PostgreSQLTRowSetGenerator.scala} | 10 +-
.../jdbc/schema/DefaultJdbcTRowSetGenerator.scala | 151 ++++++++++
.../JdbcTRowSetGenerator.scala} | 15 +-
.../kyuubi/engine/jdbc/schema/RowSetHelper.scala | 324 ---------------------
.../engine/schema/AbstractTRowSetGenerator.scala | 12 +-
13 files changed, 211 insertions(+), 386 deletions(-)
diff --git
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/DorisDialect.scala
b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/DorisDialect.scala
index f7c1ace64..32801c36e 100644
---
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/DorisDialect.scala
+++
b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/DorisDialect.scala
@@ -23,8 +23,8 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.commons.lang3.StringUtils
-import org.apache.kyuubi.engine.jdbc.doris.{DorisRowSetHelper,
DorisSchemaHelper}
-import org.apache.kyuubi.engine.jdbc.schema.{RowSetHelper, SchemaHelper}
+import org.apache.kyuubi.engine.jdbc.doris.{DorisSchemaHelper,
DorisTRowSetGenerator}
+import org.apache.kyuubi.engine.jdbc.schema.{JdbcTRowSetGenerator,
SchemaHelper}
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
import org.apache.kyuubi.session.Session
@@ -120,9 +120,7 @@ class DorisDialect extends JdbcDialect {
query.toString()
}
- override def getRowSetHelper(): RowSetHelper = {
- new DorisRowSetHelper
- }
+ override def getTRowSetGenerator(): JdbcTRowSetGenerator = new
DorisTRowSetGenerator
override def getSchemaHelper(): SchemaHelper = {
new DorisSchemaHelper
diff --git
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/JdbcDialect.scala
b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/JdbcDialect.scala
index 364898b51..6c2d3b1e0 100644
---
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/JdbcDialect.scala
+++
b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/JdbcDialect.scala
@@ -22,7 +22,7 @@ import java.util
import org.apache.kyuubi.{KyuubiException, KyuubiSQLException, Logging}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.{ENGINE_JDBC_CONNECTION_URL,
ENGINE_JDBC_SHORT_NAME}
-import org.apache.kyuubi.engine.jdbc.schema.{RowSetHelper, SchemaHelper}
+import org.apache.kyuubi.engine.jdbc.schema.{JdbcTRowSetGenerator,
SchemaHelper}
import org.apache.kyuubi.engine.jdbc.util.SupportServiceLoader
import org.apache.kyuubi.operation.Operation
import org.apache.kyuubi.session.Session
@@ -78,7 +78,7 @@ abstract class JdbcDialect extends SupportServiceLoader with
Logging {
throw KyuubiSQLException.featureNotSupported()
}
- def getRowSetHelper(): RowSetHelper
+ def getTRowSetGenerator(): JdbcTRowSetGenerator
def getSchemaHelper(): SchemaHelper
}
diff --git
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/MySQLDialect.scala
b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/MySQLDialect.scala
index 1cafcd9a9..e13924363 100644
---
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/MySQLDialect.scala
+++
b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/MySQLDialect.scala
@@ -23,8 +23,8 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.commons.lang3.StringUtils
-import org.apache.kyuubi.engine.jdbc.mysql.{MySQLRowSetHelper,
MySQLSchemaHelper}
-import org.apache.kyuubi.engine.jdbc.schema.{RowSetHelper, SchemaHelper}
+import org.apache.kyuubi.engine.jdbc.mysql.{MySQLSchemaHelper,
MySQLTRowSetGenerator}
+import org.apache.kyuubi.engine.jdbc.schema.{JdbcTRowSetGenerator,
SchemaHelper}
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
import org.apache.kyuubi.session.Session
@@ -122,9 +122,7 @@ class MySQLDialect extends JdbcDialect {
query.toString()
}
- override def getRowSetHelper(): RowSetHelper = {
- new MySQLRowSetHelper
- }
+ override def getTRowSetGenerator(): JdbcTRowSetGenerator = new
MySQLTRowSetGenerator
override def getSchemaHelper(): SchemaHelper = {
new MySQLSchemaHelper
diff --git
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/PhoenixDialect.scala
b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/PhoenixDialect.scala
index 4c8e8f265..e12f5d04b 100644
---
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/PhoenixDialect.scala
+++
b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/PhoenixDialect.scala
@@ -22,8 +22,8 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.commons.lang3.StringUtils
-import org.apache.kyuubi.engine.jdbc.phoenix.{PhoenixRowSetHelper,
PhoenixSchemaHelper}
-import org.apache.kyuubi.engine.jdbc.schema.{RowSetHelper, SchemaHelper}
+import org.apache.kyuubi.engine.jdbc.phoenix.PhoenixSchemaHelper
+import org.apache.kyuubi.engine.jdbc.schema.{DefaultJdbcTRowSetGenerator,
JdbcTRowSetGenerator, SchemaHelper}
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
import org.apache.kyuubi.session.Session
@@ -100,9 +100,7 @@ class PhoenixDialect extends JdbcDialect {
query.toString()
}
- override def getRowSetHelper(): RowSetHelper = {
- new PhoenixRowSetHelper
- }
+ override def getTRowSetGenerator(): JdbcTRowSetGenerator = new
DefaultJdbcTRowSetGenerator
override def getSchemaHelper(): SchemaHelper = {
new PhoenixSchemaHelper
diff --git
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/PostgreSQLDialect.scala
b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/PostgreSQLDialect.scala
index df64f10b2..d3d4c8297 100644
---
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/PostgreSQLDialect.scala
+++
b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/PostgreSQLDialect.scala
@@ -25,8 +25,8 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.commons.lang3.StringUtils
import org.apache.kyuubi.KyuubiSQLException
-import org.apache.kyuubi.engine.jdbc.postgresql.{PostgreSQLRowSetHelper,
PostgreSQLSchemaHelper}
-import org.apache.kyuubi.engine.jdbc.schema.{RowSetHelper, SchemaHelper}
+import org.apache.kyuubi.engine.jdbc.postgresql.{PostgreSQLSchemaHelper,
PostgreSQLTRowSetGenerator}
+import org.apache.kyuubi.engine.jdbc.schema.{JdbcTRowSetGenerator,
SchemaHelper}
import org.apache.kyuubi.operation.Operation
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
import org.apache.kyuubi.session.Session
@@ -181,9 +181,7 @@ class PostgreSQLDialect extends JdbcDialect {
throw KyuubiSQLException.featureNotSupported()
}
- override def getRowSetHelper(): RowSetHelper = {
- new PostgreSQLRowSetHelper
- }
+ override def getTRowSetGenerator(): JdbcTRowSetGenerator = new
PostgreSQLTRowSetGenerator
override def getSchemaHelper(): SchemaHelper = {
new PostgreSQLSchemaHelper
diff --git
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/doris/DorisRowSetHelper.scala
b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/doris/DorisTRowSetGenerator.scala
similarity index 65%
rename from
externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/doris/DorisRowSetHelper.scala
rename to
externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/doris/DorisTRowSetGenerator.scala
index 66a247d31..26c64d81b 100644
---
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/doris/DorisRowSetHelper.scala
+++
b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/doris/DorisTRowSetGenerator.scala
@@ -16,20 +16,20 @@
*/
package org.apache.kyuubi.engine.jdbc.doris
-import org.apache.kyuubi.engine.jdbc.schema.RowSetHelper
-import org.apache.kyuubi.shaded.hive.service.rpc.thrift._
+import org.apache.kyuubi.engine.jdbc.schema.DefaultJdbcTRowSetGenerator
+import org.apache.kyuubi.shaded.hive.service.rpc.thrift.{TColumn, TColumnValue}
-class DorisRowSetHelper extends RowSetHelper {
+class DorisTRowSetGenerator extends DefaultJdbcTRowSetGenerator {
- override def toTinyIntTColumn(rows: Seq[Seq[Any]], ordinal: Int): TColumn =
+ override def toTinyIntTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
toIntegerTColumn(rows, ordinal)
- override def toSmallIntTColumn(rows: Seq[Seq[Any]], ordinal: Int): TColumn =
+ override def toSmallIntTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
toIntegerTColumn(rows, ordinal)
- override def toTinyIntTColumnValue(row: List[Any], ordinal: Int):
TColumnValue =
+ override def toTinyIntTColumnValue(row: Seq[_], ordinal: Int): TColumnValue =
toIntegerTColumnValue(row, ordinal)
- override def toSmallIntTColumnValue(row: List[Any], ordinal: Int):
TColumnValue =
+ override def toSmallIntTColumnValue(row: Seq[_], ordinal: Int): TColumnValue
=
toIntegerTColumnValue(row, ordinal)
}
diff --git
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/mysql/MySQLRowSetHelper.scala
b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/mysql/MySQLTRowSetGenerator.scala
similarity index 63%
rename from
externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/mysql/MySQLRowSetHelper.scala
rename to
externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/mysql/MySQLTRowSetGenerator.scala
index c200513fd..1ed605370 100644
---
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/mysql/MySQLRowSetHelper.scala
+++
b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/mysql/MySQLTRowSetGenerator.scala
@@ -18,41 +18,41 @@ package org.apache.kyuubi.engine.jdbc.mysql
import java.sql.Types
-import org.apache.kyuubi.engine.jdbc.schema.RowSetHelper
+import org.apache.kyuubi.engine.jdbc.schema.DefaultJdbcTRowSetGenerator
import org.apache.kyuubi.shaded.hive.service.rpc.thrift.{TColumn, TColumnValue}
-class MySQLRowSetHelper extends RowSetHelper {
+class MySQLTRowSetGenerator extends DefaultJdbcTRowSetGenerator {
- override def toTinyIntTColumn(rows: Seq[Seq[Any]], ordinal: Int): TColumn =
+ override def toTinyIntTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
toIntegerTColumn(rows, ordinal)
- override def toSmallIntTColumn(rows: Seq[Seq[Any]], ordinal: Int): TColumn =
+ override def toSmallIntTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
toIntegerTColumn(rows, ordinal)
- override def toTinyIntTColumnValue(row: List[Any], ordinal: Int):
TColumnValue =
+ override def toTinyIntTColumnValue(row: Seq[_], ordinal: Int): TColumnValue =
toIntegerTColumnValue(row, ordinal)
- override def toSmallIntTColumnValue(row: List[Any], ordinal: Int):
TColumnValue =
+ override def toSmallIntTColumnValue(row: Seq[_], ordinal: Int): TColumnValue
=
toIntegerTColumnValue(row, ordinal)
- override protected def toIntegerTColumn(rows: Seq[Seq[Any]], ordinal: Int):
TColumn = {
+ override def toIntegerTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn = {
val colHead = if (rows.isEmpty) None else rows.head(ordinal)
colHead match {
- case v: Integer => super.toIntegerTColumn(rows, ordinal)
- case v: java.lang.Long => super.toBigIntTColumn(rows, ordinal)
+ case _: Integer => super.toIntegerTColumn(rows, ordinal)
+ case _: java.lang.Long => super.toBigIntTColumn(rows, ordinal)
case _ => super.toDefaultTColumn(rows, ordinal, Types.INTEGER)
}
}
- override protected def toIntegerTColumnValue(row: List[Any], ordinal: Int):
TColumnValue = {
+ override protected def toIntegerTColumnValue(row: Seq[_], ordinal: Int):
TColumnValue = {
row(ordinal) match {
- case v: Integer => super.toIntegerTColumnValue(row, ordinal)
- case v: java.lang.Long => super.toBigIntTColumnValue(row, ordinal)
+ case _: Integer => super.toIntegerTColumnValue(row, ordinal)
+ case _: java.lang.Long => super.toBigIntTColumnValue(row, ordinal)
case _ => super.toDefaultTColumnValue(row, ordinal, Types.INTEGER)
}
}
- override protected def toBigIntTColumn(rows: Seq[Seq[Any]], ordinal: Int):
TColumn = {
+ override protected def toBigIntTColumn(rows: Seq[Seq[_]], ordinal: Int):
TColumn = {
val colHead = if (rows.isEmpty) None else rows.head(ordinal)
colHead match {
case v: java.lang.Long => super.toBigIntTColumn(rows, ordinal)
@@ -60,7 +60,7 @@ class MySQLRowSetHelper extends RowSetHelper {
}
}
- override protected def toBigIntTColumnValue(row: List[Any], ordinal: Int):
TColumnValue =
+ override protected def toBigIntTColumnValue(row: Seq[_], ordinal: Int):
TColumnValue =
row(ordinal) match {
case v: java.lang.Long => super.toBigIntTColumnValue(row, ordinal)
case _ => super.toDefaultTColumnValue(row, ordinal, Types.BIGINT)
diff --git
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/JdbcOperation.scala
b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/JdbcOperation.scala
index ec77ff4dd..5e5819adb 100644
---
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/JdbcOperation.scala
+++
b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/JdbcOperation.scala
@@ -100,11 +100,8 @@ abstract class JdbcOperation(session: Session) extends
AbstractOperation(session
override protected def afterRun(): Unit = {}
protected def toTRowSet(taken: Iterator[Row]): TRowSet = {
- val rowSetHelper = dialect.getRowSetHelper()
- rowSetHelper.toTRowSet(
- taken.toList.map(_.values),
- schema.columns,
- getProtocolVersion)
+ dialect.getTRowSetGenerator()
+ .toTRowSet(taken.toSeq.map(_.values), schema.columns, getProtocolVersion)
}
override def getResultSetMetadata: TGetResultSetMetadataResp = {
diff --git
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/postgresql/PostgreSQLRowSetHelper.scala
b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/postgresql/PostgreSQLTRowSetGenerator.scala
similarity index 70%
rename from
externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/postgresql/PostgreSQLRowSetHelper.scala
rename to
externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/postgresql/PostgreSQLTRowSetGenerator.scala
index 8abc40a17..0d02aa732 100644
---
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/postgresql/PostgreSQLRowSetHelper.scala
+++
b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/postgresql/PostgreSQLTRowSetGenerator.scala
@@ -16,14 +16,14 @@
*/
package org.apache.kyuubi.engine.jdbc.postgresql
-import org.apache.kyuubi.engine.jdbc.schema.RowSetHelper
-import org.apache.kyuubi.shaded.hive.service.rpc.thrift._
+import org.apache.kyuubi.engine.jdbc.schema.DefaultJdbcTRowSetGenerator
+import org.apache.kyuubi.shaded.hive.service.rpc.thrift.{TColumn, TColumnValue}
-class PostgreSQLRowSetHelper extends RowSetHelper {
+class PostgreSQLTRowSetGenerator extends DefaultJdbcTRowSetGenerator {
- override def toSmallIntTColumn(rows: Seq[Seq[Any]], ordinal: Int): TColumn =
+ override def toSmallIntTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
toIntegerTColumn(rows, ordinal)
- override def toSmallIntTColumnValue(row: List[Any], ordinal: Int):
TColumnValue =
+ override protected def toSmallIntTColumnValue(row: Seq[_], ordinal: Int):
TColumnValue =
toIntegerTColumnValue(row, ordinal)
}
diff --git
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/schema/DefaultJdbcTRowSetGenerator.scala
b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/schema/DefaultJdbcTRowSetGenerator.scala
new file mode 100644
index 000000000..a2ad762be
--- /dev/null
+++
b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/schema/DefaultJdbcTRowSetGenerator.scala
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kyuubi.engine.jdbc.schema
+
+import java.sql.Date
+import java.sql.Types._
+import java.time.LocalDateTime
+import java.util
+
+import org.apache.kyuubi.shaded.hive.service.rpc.thrift._
+import org.apache.kyuubi.shaded.hive.service.rpc.thrift.TTypeId._
+import org.apache.kyuubi.util.RowSetUtils.{bitSetToBuffer, formatDate,
formatLocalDateTime}
+
+class DefaultJdbcTRowSetGenerator extends JdbcTRowSetGenerator {
+
+ override def toTColumn(rows: Seq[Seq[_]], ordinal: Int, sqlType: Int):
TColumn =
+ sqlType match {
+ case BIT => toBitTColumn(rows, ordinal)
+ case TINYINT => toTinyIntTColumn(rows, ordinal)
+ case SMALLINT => toSmallIntTColumn(rows, ordinal)
+ case INTEGER => toIntegerTColumn(rows, ordinal)
+ case BIGINT => toBigIntTColumn(rows, ordinal)
+ case REAL => toRealTColumn(rows, ordinal)
+ case DOUBLE => toDoubleTColumn(rows, ordinal)
+ case CHAR => toCharTColumn(rows, ordinal)
+ case VARCHAR => toVarcharTColumn(rows, ordinal)
+ case _ => toDefaultTColumn(rows, ordinal, sqlType)
+ }
+
+ override def toTColumnValue(ordinal: Int, row: Seq[_], types: Seq[Column]):
TColumnValue =
+ getColumnType(types, ordinal) match {
+ case BIT => toBitTColumnValue(row, ordinal)
+ case TINYINT => toTinyIntTColumnValue(row, ordinal)
+ case SMALLINT => toSmallIntTColumnValue(row, ordinal)
+ case INTEGER => toIntegerTColumnValue(row, ordinal)
+ case BIGINT => toBigIntTColumnValue(row, ordinal)
+ case REAL => toRealTColumnValue(row, ordinal)
+ case DOUBLE => toDoubleTColumnValue(row, ordinal)
+ case CHAR => toCharTColumnValue(row, ordinal)
+ case VARCHAR => toVarcharTColumnValue(row, ordinal)
+ case otherType => toDefaultTColumnValue(row, ordinal, otherType)
+ }
+
+ protected def toDefaultTColumn(rows: Seq[Seq[_]], ordinal: Int, sqlType:
Int): TColumn = {
+ val nulls = new java.util.BitSet()
+ val rowSize = rows.length
+ val values = new util.ArrayList[String](rowSize)
+ var i = 0
+ while (i < rowSize) {
+ val row = rows(i)
+ nulls.set(i, row(ordinal) == null)
+ val value =
+ if (row(ordinal) == null) {
+ ""
+ } else {
+ toHiveString(row(ordinal), sqlType)
+ }
+ values.add(value)
+ i += 1
+ }
+ TColumn.stringVal(new TStringColumn(values, nulls))
+ }
+
+ protected def toBitTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
+ toTTypeColumn(BOOLEAN_TYPE, rows, ordinal)
+
+ protected def toTinyIntTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
+ toTTypeColumn(TINYINT_TYPE, rows, ordinal)
+
+ protected def toSmallIntTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
+ toTTypeColumn(SMALLINT_TYPE, rows, ordinal)
+
+ protected def toIntegerTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
+ toTTypeColumn(INT_TYPE, rows, ordinal)
+
+ protected def toBigIntTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
+ toTTypeColumn(BIGINT_TYPE, rows, ordinal)
+
+ protected def toRealTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
+ toTTypeColumn(FLOAT_TYPE, rows, ordinal)
+
+ protected def toDoubleTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
+ toTTypeColumn(DOUBLE_TYPE, rows, ordinal)
+
+ protected def toCharTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
+ toTTypeColumn(CHAR_TYPE, rows, ordinal)
+
+ protected def toVarcharTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
+ toTTypeColumn(STRING_TYPE, rows, ordinal)
+
+ // ==========================================================
+
+ protected def toBitTColumnValue(row: Seq[_], ordinal: Int): TColumnValue =
+ toTTypeColumnVal(BOOLEAN_TYPE, row, ordinal)
+
+ protected def toTinyIntTColumnValue(row: Seq[_], ordinal: Int): TColumnValue
=
+ toTTypeColumnVal(TINYINT_TYPE, row, ordinal)
+
+ protected def toSmallIntTColumnValue(row: Seq[_], ordinal: Int):
TColumnValue =
+ toTTypeColumnVal(SMALLINT_TYPE, row, ordinal)
+
+ protected def toIntegerTColumnValue(row: Seq[_], ordinal: Int): TColumnValue
=
+ toTTypeColumnVal(INT_TYPE, row, ordinal)
+
+ protected def toBigIntTColumnValue(row: Seq[_], ordinal: Int): TColumnValue =
+ toTTypeColumnVal(BIGINT_TYPE, row, ordinal)
+
+ protected def toRealTColumnValue(row: Seq[_], ordinal: Int): TColumnValue =
+ toTTypeColumnVal(FLOAT_TYPE, row, ordinal)
+
+ protected def toDoubleTColumnValue(row: Seq[_], ordinal: Int): TColumnValue =
+ toTTypeColumnVal(DOUBLE_TYPE, row, ordinal)
+
+ protected def toCharTColumnValue(row: Seq[_], ordinal: Int): TColumnValue =
+ toTTypeColumnVal(STRING_TYPE, row, ordinal)
+
+ protected def toVarcharTColumnValue(row: Seq[_], ordinal: Int): TColumnValue
=
+ toTTypeColumnVal(STRING_TYPE, row, ordinal)
+
+ protected def toDefaultTColumnValue(row: Seq[_], ordinal: Int, sqlType:
Int): TColumnValue = {
+ val tStrValue = new TStringValue
+ if (row(ordinal) != null) {
+ tStrValue.setValue(
+ toHiveString(row(ordinal), sqlType))
+ }
+ TColumnValue.stringVal(tStrValue)
+ }
+
+ protected def toHiveString(data: Any, sqlType: Int): String =
+ (data, sqlType) match {
+ case (date: Date, DATE) => formatDate(date)
+ case (dateTime: LocalDateTime, TIMESTAMP) =>
formatLocalDateTime(dateTime)
+ case (decimal: java.math.BigDecimal, DECIMAL) => decimal.toPlainString
+ case (bigint: java.math.BigInteger, BIGINT) => bigint.toString()
+ case (other, _) => other.toString
+ }
+}
diff --git
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/phoenix/PhoenixRowSetHelper.scala
b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/schema/JdbcTRowSetGenerator.scala
similarity index 56%
rename from
externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/phoenix/PhoenixRowSetHelper.scala
rename to
externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/schema/JdbcTRowSetGenerator.scala
index 67d9d09e5..519ac5f79 100644
---
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/phoenix/PhoenixRowSetHelper.scala
+++
b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/schema/JdbcTRowSetGenerator.scala
@@ -14,8 +14,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kyuubi.engine.jdbc.phoenix
+package org.apache.kyuubi.engine.jdbc.schema
-import org.apache.kyuubi.engine.jdbc.schema.RowSetHelper
+import org.apache.kyuubi.engine.schema.AbstractTRowSetGenerator
-class PhoenixRowSetHelper extends RowSetHelper {}
+trait JdbcTRowSetGenerator extends AbstractTRowSetGenerator[Seq[Column],
Seq[_], Int] {
+ override protected def getColumnSizeFromSchemaType(schema: Seq[Column]): Int
= schema.length
+
+ override protected def getColumnType(schema: Seq[Column], ordinal: Int): Int
=
+ schema(ordinal).sqlType
+
+ override protected def isColumnNullAt(row: Seq[_], ordinal: Int): Boolean =
row(ordinal) == null
+
+ override protected def getColumnAs[T](row: Seq[_], ordinal: Int): T =
row(ordinal).asInstanceOf[T]
+}
diff --git
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/schema/RowSetHelper.scala
b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/schema/RowSetHelper.scala
deleted file mode 100644
index e86232056..000000000
---
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/schema/RowSetHelper.scala
+++ /dev/null
@@ -1,324 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kyuubi.engine.jdbc.schema
-
-import java.{lang, util}
-import java.sql.{Date, Types}
-import java.time.LocalDateTime
-
-import scala.collection.JavaConverters._
-
-import org.apache.kyuubi.shaded.hive.service.rpc.thrift._
-import org.apache.kyuubi.util.RowSetUtils.{bitSetToBuffer, formatDate,
formatLocalDateTime}
-
-abstract class RowSetHelper {
-
- def toTRowSet(
- rows: Seq[List[_]],
- columns: List[Column],
- protocolVersion: TProtocolVersion): TRowSet = {
- if (protocolVersion.getValue <
TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V6.getValue) {
- toRowBasedSet(rows, columns)
- } else {
- toColumnBasedSet(rows, columns)
- }
- }
-
- private def toRowBasedSet(rows: Seq[List[_]], columns: List[Column]):
TRowSet = {
- val rowSize = rows.length
- val tRows = new util.ArrayList[TRow](rowSize)
- var i = 0
- while (i < rowSize) {
- val row = rows(i)
- val tRow = new TRow()
- val columnSize = row.size
- var j = 0
- while (j < columnSize) {
- val columnValue = toTColumnValue(j, row, columns(i).sqlType)
- tRow.addToColVals(columnValue)
- j += 1
- }
- tRows.add(tRow)
- i += 1
- }
- new TRowSet(0, tRows)
- }
-
- private def toColumnBasedSet(rows: Seq[List[_]], columns: List[Column]):
TRowSet = {
- val size = rows.size
- val tRowSet = new TRowSet(0, new java.util.ArrayList[TRow](size))
- val columnSize = columns.length
- var i = 0
- while (i < columnSize) {
- val field = columns(i)
- val tColumn = toTColumn(rows, i, field.sqlType)
- tRowSet.addToColumns(tColumn)
- i += 1
- }
- tRowSet
- }
-
- protected def toTColumn(
- rows: Seq[Seq[Any]],
- ordinal: Int,
- sqlType: Int): TColumn = {
- sqlType match {
- case Types.BIT =>
- toBitTColumn(rows, ordinal)
-
- case Types.TINYINT =>
- toTinyIntTColumn(rows, ordinal)
-
- case Types.SMALLINT =>
- toSmallIntTColumn(rows, ordinal)
-
- case Types.INTEGER =>
- toIntegerTColumn(rows, ordinal)
-
- case Types.BIGINT =>
- toBigIntTColumn(rows, ordinal)
-
- case Types.REAL =>
- toRealTColumn(rows, ordinal)
-
- case Types.DOUBLE =>
- toDoubleTColumn(rows, ordinal)
-
- case Types.CHAR =>
- toCharTColumn(rows, ordinal)
-
- case Types.VARCHAR =>
- toVarcharTColumn(rows, ordinal)
-
- case _ =>
- toDefaultTColumn(rows, ordinal, sqlType)
- }
- }
-
- protected def toTColumnValue(ordinal: Int, row: List[Any], sqlType: Int):
TColumnValue = {
- sqlType match {
- case Types.BIT =>
- toBitTColumnValue(row, ordinal)
-
- case Types.TINYINT =>
- toTinyIntTColumnValue(row, ordinal)
-
- case Types.SMALLINT =>
- toSmallIntTColumnValue(row, ordinal)
-
- case Types.INTEGER =>
- toIntegerTColumnValue(row, ordinal)
-
- case Types.BIGINT =>
- toBigIntTColumnValue(row, ordinal)
-
- case Types.REAL =>
- toRealTColumnValue(row, ordinal)
-
- case Types.DOUBLE =>
- toDoubleTColumnValue(row, ordinal)
-
- case Types.CHAR =>
- toCharTColumnValue(row, ordinal)
-
- case Types.VARCHAR =>
- toVarcharTColumnValue(row, ordinal)
-
- case _ =>
- toDefaultTColumnValue(row, ordinal, sqlType)
- }
- }
-
- protected def getOrSetAsNull[T](
- rows: Seq[Seq[Any]],
- ordinal: Int,
- nulls: java.util.BitSet,
- defaultVal: T): java.util.List[T] = {
- val size = rows.length
- val ret = new java.util.ArrayList[T](size)
- var idx = 0
- while (idx < size) {
- val row = rows(idx)
- val isNull = row(ordinal) == null
- if (isNull) {
- nulls.set(idx, true)
- ret.add(idx, defaultVal)
- } else {
- ret.add(idx, row(ordinal).asInstanceOf[T])
- }
- idx += 1
- }
- ret
- }
-
- protected def toDefaultTColumn(rows: Seq[Seq[Any]], ordinal: Int, sqlType:
Int): TColumn = {
- val nulls = new java.util.BitSet()
- val rowSize = rows.length
- val values = new util.ArrayList[String](rowSize)
- var i = 0
- while (i < rowSize) {
- val row = rows(i)
- nulls.set(i, row(ordinal) == null)
- val value =
- if (row(ordinal) == null) {
- ""
- } else {
- toHiveString(row(ordinal), sqlType)
- }
- values.add(value)
- i += 1
- }
- TColumn.stringVal(new TStringColumn(values, nulls))
- }
-
- protected def toBitTColumn(rows: Seq[Seq[Any]], ordinal: Int): TColumn = {
- val nulls = new java.util.BitSet()
- val values = getOrSetAsNull[java.lang.Boolean](rows, ordinal, nulls, true)
- TColumn.boolVal(new TBoolColumn(values, nulls))
- }
-
- protected def toTinyIntTColumn(rows: Seq[Seq[Any]], ordinal: Int): TColumn =
{
- val nulls = new java.util.BitSet()
- val values = getOrSetAsNull[java.lang.Byte](rows, ordinal, nulls, 0.toByte)
- TColumn.byteVal(new TByteColumn(values, nulls))
- }
-
- protected def toSmallIntTColumn(rows: Seq[Seq[Any]], ordinal: Int): TColumn
= {
- val nulls = new java.util.BitSet()
- val values = getOrSetAsNull[java.lang.Short](rows, ordinal, nulls,
0.toShort)
- TColumn.i16Val(new TI16Column(values, nulls))
- }
-
- protected def toIntegerTColumn(rows: Seq[Seq[Any]], ordinal: Int): TColumn =
{
- val nulls = new java.util.BitSet()
- val values = getOrSetAsNull[java.lang.Integer](rows, ordinal, nulls, 0)
- TColumn.i32Val(new TI32Column(values, nulls))
- }
-
- protected def toBigIntTColumn(rows: Seq[Seq[Any]], ordinal: Int): TColumn = {
- val nulls = new java.util.BitSet()
- val values = getOrSetAsNull[lang.Long](rows, ordinal, nulls, 0L)
- TColumn.i64Val(new TI64Column(values, nulls))
- }
-
- protected def toRealTColumn(rows: Seq[Seq[Any]], ordinal: Int): TColumn = {
- val nulls = new java.util.BitSet()
- val values = getOrSetAsNull[lang.Float](rows, ordinal, nulls, 0.toFloat)
- .asScala.map(n => java.lang.Double.valueOf(n.toString)).asJava
- TColumn.doubleVal(new TDoubleColumn(values, nulls))
- }
-
- protected def toDoubleTColumn(rows: Seq[Seq[Any]], ordinal: Int): TColumn = {
- val nulls = new java.util.BitSet()
- val values = getOrSetAsNull[lang.Double](rows, ordinal, nulls, 0.toDouble)
- TColumn.doubleVal(new TDoubleColumn(values, nulls))
- }
-
- protected def toCharTColumn(rows: Seq[Seq[Any]], ordinal: Int): TColumn = {
- toVarcharTColumn(rows, ordinal)
- }
-
- protected def toVarcharTColumn(rows: Seq[Seq[Any]], ordinal: Int): TColumn =
{
- val nulls = new java.util.BitSet()
- val values = getOrSetAsNull[String](rows, ordinal, nulls, "")
- TColumn.stringVal(new TStringColumn(values, nulls))
- }
-
- // ==========================================================
-
- protected def toBitTColumnValue(row: List[Any], ordinal: Int): TColumnValue
= {
- val boolValue = new TBoolValue
- if (row(ordinal) != null)
boolValue.setValue(row(ordinal).asInstanceOf[Boolean])
- TColumnValue.boolVal(boolValue)
- }
-
- protected def toTinyIntTColumnValue(row: List[Any], ordinal: Int):
TColumnValue = {
- val byteValue = new TByteValue
- if (row(ordinal) != null)
byteValue.setValue(row(ordinal).asInstanceOf[Byte])
- TColumnValue.byteVal(byteValue)
- }
-
- protected def toSmallIntTColumnValue(row: List[Any], ordinal: Int):
TColumnValue = {
- val tI16Value = new TI16Value
- if (row(ordinal) != null)
tI16Value.setValue(row(ordinal).asInstanceOf[Short])
- TColumnValue.i16Val(tI16Value)
- }
-
- protected def toIntegerTColumnValue(row: List[Any], ordinal: Int):
TColumnValue = {
- val tI32Value = new TI32Value
- if (row(ordinal) != null)
tI32Value.setValue(row(ordinal).asInstanceOf[Int])
- TColumnValue.i32Val(tI32Value)
- }
-
- protected def toBigIntTColumnValue(row: List[Any], ordinal: Int):
TColumnValue = {
- val tI64Value = new TI64Value
- if (row(ordinal) != null)
tI64Value.setValue(row(ordinal).asInstanceOf[Long])
- TColumnValue.i64Val(tI64Value)
- }
-
- protected def toRealTColumnValue(row: List[Any], ordinal: Int): TColumnValue
= {
- val tDoubleValue = new TDoubleValue
- if (row(ordinal) != null) {
- val doubleValue =
java.lang.Double.valueOf(row(ordinal).asInstanceOf[Float].toString)
- tDoubleValue.setValue(doubleValue)
- }
- TColumnValue.doubleVal(tDoubleValue)
- }
-
- protected def toDoubleTColumnValue(row: List[Any], ordinal: Int):
TColumnValue = {
- val tDoubleValue = new TDoubleValue
- if (row(ordinal) != null)
tDoubleValue.setValue(row(ordinal).asInstanceOf[Double])
- TColumnValue.doubleVal(tDoubleValue)
- }
-
- protected def toCharTColumnValue(row: List[Any], ordinal: Int): TColumnValue
= {
- toVarcharTColumnValue(row, ordinal)
- }
-
- protected def toVarcharTColumnValue(row: List[Any], ordinal: Int):
TColumnValue = {
- val tStringValue = new TStringValue
- if (row(ordinal) != null)
tStringValue.setValue(row(ordinal).asInstanceOf[String])
- TColumnValue.stringVal(tStringValue)
- }
-
- protected def toDefaultTColumnValue(
- row: List[Any],
- ordinal: Int,
- sqlType: Int): TColumnValue = {
- val tStrValue = new TStringValue
- if (row(ordinal) != null) {
- tStrValue.setValue(
- toHiveString(row(ordinal), sqlType))
- }
- TColumnValue.stringVal(tStrValue)
- }
-
- protected def toHiveString(data: Any, sqlType: Int): String = {
- (data, sqlType) match {
- case (date: Date, Types.DATE) =>
- formatDate(date)
- case (dateTime: LocalDateTime, Types.TIMESTAMP) =>
- formatLocalDateTime(dateTime)
- case (decimal: java.math.BigDecimal, Types.DECIMAL) =>
- decimal.toPlainString
- case (bigint: java.math.BigInteger, Types.BIGINT) =>
- bigint.toString()
- case (other, _) =>
- other.toString
- }
- }
-}
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/schema/AbstractTRowSetGenerator.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/schema/AbstractTRowSetGenerator.scala
index 365ed7298..3433bc2b0 100644
---
a/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/schema/AbstractTRowSetGenerator.scala
+++
b/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/schema/AbstractTRowSetGenerator.scala
@@ -121,7 +121,7 @@ trait AbstractTRowSetGenerator[SchemaT, RowT, ColumnT] {
if (!isNull) byteValue.setValue(getColumnAs[java.lang.Byte](row,
ordinal))
TColumnValue.byteVal(byteValue)
- case TINYINT_TYPE =>
+ case SMALLINT_TYPE | TINYINT_TYPE =>
val tI16Value = new TI16Value
if (!isNull) tI16Value.setValue(getColumnAs[java.lang.Short](row,
ordinal))
TColumnValue.i16Val(tI16Value)
@@ -167,11 +167,7 @@ trait AbstractTRowSetGenerator[SchemaT, RowT, ColumnT] {
val values = getOrSetAsNull[java.lang.Byte](rows, ordinal, nulls,
0.toByte)
TColumn.byteVal(new TByteColumn(values, nulls))
- case SMALLINT_TYPE =>
- val values = getOrSetAsNull[java.lang.Short](rows, ordinal, nulls,
0.toShort)
- TColumn.i16Val(new TI16Column(values, nulls))
-
- case TINYINT_TYPE =>
+ case SMALLINT_TYPE | TINYINT_TYPE =>
val values = getOrSetAsNull[java.lang.Short](rows, ordinal, nulls,
0.toShort)
TColumn.i16Val(new TI16Column(values, nulls))
@@ -196,6 +192,10 @@ trait AbstractTRowSetGenerator[SchemaT, RowT, ColumnT] {
val values = getOrSetAsNull[java.lang.String](rows, ordinal, nulls, "")
TColumn.stringVal(new TStringColumn(values, nulls))
+ case CHAR_TYPE =>
+ val values = getOrSetAsNull[java.lang.String](rows, ordinal, nulls, "")
+ TColumn.stringVal(new TStringColumn(values, nulls))
+
case ARRAY_TYPE =>
val values = getOrSetAsNull[Array[Byte]](rows, ordinal, nulls, Array())
.asScala