This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new aac4503bb [KYUUBI #5870] Directly mapping engine's data type to Java
type for TRowSet generation
aac4503bb is described below
commit aac4503bba1640320e87f0f6cf721cb80a48af1e
Author: Bowen Liang <[email protected]>
AuthorDate: Thu Dec 21 12:16:49 2023 +0800
[KYUUBI #5870] Directly mapping engine's data type to Java type for TRowSet
generation
# :mag: Description
## Issue References ๐
As described.
## Describe Your Solution ๐ง
Previously, the TRowSetGenerator built a chain of `Engine's data type` to
`Thrift data type` and then to `Java data type`.
This limits the possibility of overriding the implementation of Java data
types handlers and coupling the `Engine's data type` to specific `Thrift data
type`.
1. This PR directly mapping `Engine's data type` to `Java data type`
focusing converting the upstream Engine result set's value to real data type.
2. it also decoupled the `Thrift data type` to Java data types, eg.
Thrift's SMALL_INT and TINY_INT can be now ignored, and making the engines data
type mapped to Java's Short data type.
3. it's now able to distinguish similar terms of data types with different
meanings, like Java's `byte` or `byte array` and `tiny int` or `samll int` in
engines and thrift. Only Java data types are represented in
`TRowSetColumnGenerator` and `TRowSetColumnValueGenerator`
## Types of changes :bookmark:
- [ ] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing
functionality to change)
## Test Plan ๐งช
#### Behavior Without This Pull Request :coffin:
No behaviour changes.
#### Behavior With This Pull Request :tada:
No behaviour changes.
#### Related Unit Tests
CI tests.
---
# Checklists
## ๐ Author Self Checklist
- [x] My code follows the [style
guidelines](https://kyuubi.readthedocs.io/en/master/contributing/code/style.html)
of this project
- [x] I have performed a self-review
- [x] I have commented my code, particularly in hard-to-understand areas
- [ ] I have made corresponding changes to the documentation
- [x] My changes generate no new warnings
- [ ] I have added tests that prove my fix is effective or that my feature
works
- [x] New and existing unit tests pass locally with my changes
- [x] This patch was not authored or co-authored using [Generative
Tooling](https://www.apache.org/legal/generative-tooling.html)
## ๐ Committer Pre-Merge Checklist
- [x] Pull request title is okay.
- [x] No license issues.
- [x] Milestone correctly set?
- [x] Test coverage is ok
- [x] Assignees are selected.
- [x] Minimum number of approvals
- [x] No changes are requested
**Be nice. Be informative.**
Closes #5870 from bowenliang123/rowset-map.
Closes #5870
51740a00a [Bowen Liang] update
3dc48f4b9 [liangbowen] type mapping
Lead-authored-by: Bowen Liang <[email protected]>
Co-authored-by: liangbowen <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
.../engine/chat/schema/ChatTRowSetGenerator.scala | 14 +-
.../flink/schema/FlinkTRowSetGenerator.scala | 135 ++++++-------
.../engine/jdbc/dialect/PhoenixDialect.scala | 6 +-
.../engine/jdbc/doris/DorisTRowSetGenerator.scala | 18 +-
.../engine/jdbc/mysql/MySQLTRowSetGenerator.scala | 23 +--
.../PhoenixTRowSetGenerator.scala} | 12 +-
.../postgresql/PostgreSQLTRowSetGenerator.scala | 2 +-
.../jdbc/schema/DefaultJdbcTRowSetGenerator.scala | 115 +++++------
.../engine/jdbc/schema/JdbcTRowSetGenerator.scala | 9 +-
.../spark/schema/SparkArrowTRowSetGenerator.scala | 10 +-
.../spark/schema/SparkTRowSetGenerator.scala | 77 ++++----
.../trino/schema/TrinoTRowSetGenerator.scala | 82 +++-----
.../kyuubi/engine/result/TColumnGenerator.scala | 105 +++++++++++
.../engine/result/TColumnValueGenerator.scala | 97 ++++++++++
.../kyuubi/engine/result/TRowSetColumnGetter.scala | 13 +-
.../kyuubi/engine/result/TRowSetGenerator.scala | 77 ++++++++
.../engine/schema/AbstractTRowSetGenerator.scala | 210 ---------------------
.../scala/org/apache/kyuubi/util/RowSetUtils.scala | 7 -
.../kyuubi/sql/schema/ServerTRowSetGenerator.scala | 60 +++---
19 files changed, 506 insertions(+), 566 deletions(-)
diff --git
a/externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/schema/ChatTRowSetGenerator.scala
b/externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/schema/ChatTRowSetGenerator.scala
index 990a19764..7e6a121be 100644
---
a/externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/schema/ChatTRowSetGenerator.scala
+++
b/externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/schema/ChatTRowSetGenerator.scala
@@ -18,31 +18,29 @@
package org.apache.kyuubi.engine.chat.schema
import org.apache.kyuubi.engine.chat.schema.ChatTRowSetGenerator._
-import org.apache.kyuubi.engine.schema.AbstractTRowSetGenerator
+import org.apache.kyuubi.engine.result.TRowSetGenerator
import org.apache.kyuubi.shaded.hive.service.rpc.thrift._
-import org.apache.kyuubi.shaded.hive.service.rpc.thrift.TTypeId._
class ChatTRowSetGenerator
- extends AbstractTRowSetGenerator[Seq[String], Seq[String], String] {
+ extends TRowSetGenerator[Seq[String], Seq[String], String] {
override def getColumnSizeFromSchemaType(schema: Seq[String]): Int =
schema.length
override def getColumnType(schema: Seq[String], ordinal: Int): String =
COL_STRING_TYPE
- override protected def isColumnNullAt(row: Seq[String], ordinal: Int):
Boolean =
- row(ordinal) == null
+ override def isColumnNullAt(row: Seq[String], ordinal: Int): Boolean =
row(ordinal) == null
override def getColumnAs[T](row: Seq[String], ordinal: Int): T =
row(ordinal).asInstanceOf[T]
override def toTColumn(rows: Seq[Seq[String]], ordinal: Int, typ: String):
TColumn =
typ match {
- case COL_STRING_TYPE => toTTypeColumn(STRING_TYPE, rows, ordinal)
+ case COL_STRING_TYPE => asStringTColumn(rows, ordinal)
case otherType => throw new UnsupportedOperationException(s"type
$otherType")
}
- override def toTColumnValue(ordinal: Int, row: Seq[String], types:
Seq[String]): TColumnValue =
+ override def toTColumnValue(row: Seq[String], ordinal: Int, types:
Seq[String]): TColumnValue =
getColumnType(types, ordinal) match {
- case "String" => toTTypeColumnVal(STRING_TYPE, row, ordinal)
+ case COL_STRING_TYPE => asStringTColumnValue(row, ordinal)
case otherType => throw new UnsupportedOperationException(s"type
$otherType")
}
}
diff --git
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/schema/FlinkTRowSetGenerator.scala
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/schema/FlinkTRowSetGenerator.scala
index b53aab47f..463b66111 100644
---
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/schema/FlinkTRowSetGenerator.scala
+++
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/schema/FlinkTRowSetGenerator.scala
@@ -18,21 +18,17 @@ package org.apache.kyuubi.engine.flink.schema
import java.time.{Instant, ZonedDateTime, ZoneId}
-import scala.collection.JavaConverters._
-
import org.apache.flink.table.data.StringData
import org.apache.flink.table.types.logical._
import org.apache.flink.types.Row
import org.apache.kyuubi.engine.flink.result.ResultSet
import org.apache.kyuubi.engine.flink.schema.RowSet.{toHiveString,
TIMESTAMP_LZT_FORMATTER}
-import org.apache.kyuubi.engine.schema.AbstractTRowSetGenerator
+import org.apache.kyuubi.engine.result.TRowSetGenerator
import org.apache.kyuubi.shaded.hive.service.rpc.thrift._
-import org.apache.kyuubi.shaded.hive.service.rpc.thrift.TTypeId._
-import org.apache.kyuubi.util.RowSetUtils.bitSetToBuffer
class FlinkTRowSetGenerator(zoneId: ZoneId)
- extends AbstractTRowSetGenerator[ResultSet, Row, LogicalType] {
+ extends TRowSetGenerator[ResultSet, Row, LogicalType] {
override def getColumnSizeFromSchemaType(schema: ResultSet): Int =
schema.columns.size
override def getColumnType(schema: ResultSet, ordinal: Int): LogicalType =
@@ -42,99 +38,74 @@ class FlinkTRowSetGenerator(zoneId: ZoneId)
override def getColumnAs[T](row: Row, ordinal: Int): T =
row.getFieldAs[T](ordinal)
- override def toTColumnValue(ordinal: Int, row: Row, types: ResultSet):
TColumnValue = {
+ override def toTColumnValue(row: Row, ordinal: Int, types: ResultSet):
TColumnValue = {
getColumnType(types, ordinal) match {
- case _: BooleanType => toTTypeColumnVal(BOOLEAN_TYPE, row, ordinal)
- case _: TinyIntType => toTTypeColumnVal(BINARY_TYPE, row, ordinal)
- case _: SmallIntType => toTTypeColumnVal(TINYINT_TYPE, row, ordinal)
- case _: IntType => toTTypeColumnVal(INT_TYPE, row, ordinal)
- case _: BigIntType => toTTypeColumnVal(BIGINT_TYPE, row, ordinal)
- case _: DoubleType => toTTypeColumnVal(DOUBLE_TYPE, row, ordinal)
- case _: FloatType => toTTypeColumnVal(FLOAT_TYPE, row, ordinal)
+ case _: BooleanType => asBooleanTColumnValue(row, ordinal)
+ case _: TinyIntType => asByteTColumnValue(row, ordinal)
+ case _: SmallIntType => asShortTColumnValue(row, ordinal)
+ case _: IntType => asIntegerTColumnValue(row, ordinal)
+ case _: BigIntType => asLongTColumnValue(row, ordinal)
+ case _: DoubleType => asDoubleTColumnValue(row, ordinal)
+ case _: FloatType => asFloatTColumnValue(row, ordinal)
case t @ (_: VarCharType | _: CharType) =>
- val tStringValue = new TStringValue
- val fieldValue = row.getField(ordinal)
- fieldValue match {
- case value: String =>
- tStringValue.setValue(value)
- case value: StringData =>
- tStringValue.setValue(value.toString)
- case null =>
- tStringValue.setValue(null)
- case other =>
- throw new IllegalArgumentException(
- s"Unsupported conversion class ${other.getClass} " +
- s"for type ${t.getClass}.")
- }
- TColumnValue.stringVal(tStringValue)
+ asStringTColumnValue(
+ row,
+ ordinal,
+ convertFunc = {
+ case value: String => value
+ case value: StringData => value.toString
+ case null => null
+ case other => throw new IllegalArgumentException(
+ s"Unsupported conversion class ${other.getClass} for type
${t.getClass}.")
+ })
case _: LocalZonedTimestampType =>
- val tStringValue = new TStringValue
- val fieldValue = row.getField(ordinal)
- tStringValue.setValue(TIMESTAMP_LZT_FORMATTER.format(
- ZonedDateTime.ofInstant(fieldValue.asInstanceOf[Instant], zoneId)))
- TColumnValue.stringVal(tStringValue)
- case t =>
- val tStringValue = new TStringValue
- if (row.getField(ordinal) != null) {
- tStringValue.setValue(toHiveString((row.getField(ordinal), t)))
- }
- TColumnValue.stringVal(tStringValue)
+ asStringTColumnValue(
+ row,
+ ordinal,
+ rawValue =>
+ TIMESTAMP_LZT_FORMATTER.format(
+ ZonedDateTime.ofInstant(rawValue.asInstanceOf[Instant], zoneId)))
+ case t => asStringTColumnValue(row, ordinal, rawValue =>
toHiveString((rawValue, t)))
}
}
override def toTColumn(rows: Seq[Row], ordinal: Int, logicalType:
LogicalType): TColumn = {
- val nulls = new java.util.BitSet()
// for each column, determine the conversion class by sampling the first
non-value value
// if there's no row, set the entire column empty
- val sampleField = rows.iterator.map(_.getField(ordinal)).find(_ ne
null).orNull
logicalType match {
- case _: BooleanType => toTTypeColumn(BOOLEAN_TYPE, rows, ordinal)
- case _: TinyIntType => toTTypeColumn(BINARY_TYPE, rows, ordinal)
- case _: SmallIntType => toTTypeColumn(TINYINT_TYPE, rows, ordinal)
- case _: IntType => toTTypeColumn(INT_TYPE, rows, ordinal)
- case _: BigIntType => toTTypeColumn(BIGINT_TYPE, rows, ordinal)
- case _: FloatType => toTTypeColumn(FLOAT_TYPE, rows, ordinal)
- case _: DoubleType => toTTypeColumn(DOUBLE_TYPE, rows, ordinal)
+ case _: BooleanType => asBooleanTColumn(rows, ordinal)
+ case _: TinyIntType => asByteTColumn(rows, ordinal)
+ case _: SmallIntType => asShortTColumn(rows, ordinal)
+ case _: IntType => asIntegerTColumn(rows, ordinal)
+ case _: BigIntType => asLongTColumn(rows, ordinal)
+ case _: FloatType => asFloatTColumn(rows, ordinal)
+ case _: DoubleType => asDoubleTColumn(rows, ordinal)
case t @ (_: VarCharType | _: CharType) =>
- val values: java.util.List[String] = new java.util.ArrayList[String](0)
+ val sampleField = rows.iterator.map(_.getField(ordinal)).find(_ ne
null).orNull
sampleField match {
- case _: String =>
- values.addAll(getOrSetAsNull[String](rows, ordinal, nulls, ""))
+ case _: String => asStringTColumn(rows, ordinal)
case _: StringData =>
- val stringDataValues =
- getOrSetAsNull[StringData](rows, ordinal, nulls,
StringData.fromString(""))
- stringDataValues.forEach(e => values.add(e.toString))
- case null =>
- values.addAll(getOrSetAsNull[String](rows, ordinal, nulls, ""))
- case other =>
- throw new IllegalArgumentException(
- s"Unsupported conversion class ${other.getClass} " +
- s"for type ${t.getClass}.")
+ asStringTColumn(
+ rows,
+ ordinal,
+ convertFunc = (row, ordinal) => getColumnAs[StringData](row,
ordinal).toString)
+ case null => asStringTColumn(rows, ordinal)
+ case other => throw new IllegalArgumentException(
+ s"Unsupported conversion class ${other.getClass} for type
${t.getClass}.")
}
- TColumn.stringVal(new TStringColumn(values, nulls))
case _: LocalZonedTimestampType =>
- val values = getOrSetAsNull[Instant](rows, ordinal, nulls,
Instant.EPOCH)
- .toArray().map(v =>
+ asStringTColumn(
+ rows,
+ ordinal,
+
TIMESTAMP_LZT_FORMATTER.format(ZonedDateTime.ofInstant(Instant.EPOCH, zoneId)),
+ (row, ordinal) =>
TIMESTAMP_LZT_FORMATTER.format(
- ZonedDateTime.ofInstant(v.asInstanceOf[Instant], zoneId)))
- TColumn.stringVal(new TStringColumn(values.toList.asJava, nulls))
+ ZonedDateTime.ofInstant(getColumnAs[Instant](row, ordinal),
zoneId)))
case _ =>
- var i = 0
- val rowSize = rows.length
- val values = new java.util.ArrayList[String](rowSize)
- while (i < rowSize) {
- val row = rows(i)
- nulls.set(i, row.getField(ordinal) == null)
- val value =
- if (row.getField(ordinal) == null) {
- ""
- } else {
- toHiveString((row.getField(ordinal), logicalType))
- }
- values.add(value)
- i += 1
- }
- TColumn.stringVal(new TStringColumn(values, nulls))
+ asStringTColumn(
+ rows,
+ ordinal,
+ convertFunc = (row, ordinal) => toHiveString((row.getField(ordinal),
logicalType)))
}
}
diff --git
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/PhoenixDialect.scala
b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/PhoenixDialect.scala
index e12f5d04b..61440ac50 100644
---
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/PhoenixDialect.scala
+++
b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/PhoenixDialect.scala
@@ -22,8 +22,8 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.commons.lang3.StringUtils
-import org.apache.kyuubi.engine.jdbc.phoenix.PhoenixSchemaHelper
-import org.apache.kyuubi.engine.jdbc.schema.{DefaultJdbcTRowSetGenerator,
JdbcTRowSetGenerator, SchemaHelper}
+import org.apache.kyuubi.engine.jdbc.phoenix.{PhoenixSchemaHelper,
PhoenixTRowSetGenerator}
+import org.apache.kyuubi.engine.jdbc.schema.{JdbcTRowSetGenerator,
SchemaHelper}
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
import org.apache.kyuubi.session.Session
@@ -100,7 +100,7 @@ class PhoenixDialect extends JdbcDialect {
query.toString()
}
- override def getTRowSetGenerator(): JdbcTRowSetGenerator = new
DefaultJdbcTRowSetGenerator
+ override def getTRowSetGenerator(): JdbcTRowSetGenerator = new
PhoenixTRowSetGenerator
override def getSchemaHelper(): SchemaHelper = {
new PhoenixSchemaHelper
diff --git
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/doris/DorisTRowSetGenerator.scala
b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/doris/DorisTRowSetGenerator.scala
index 26c64d81b..b77a7b310 100644
---
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/doris/DorisTRowSetGenerator.scala
+++
b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/doris/DorisTRowSetGenerator.scala
@@ -16,20 +16,6 @@
*/
package org.apache.kyuubi.engine.jdbc.doris
-import org.apache.kyuubi.engine.jdbc.schema.DefaultJdbcTRowSetGenerator
-import org.apache.kyuubi.shaded.hive.service.rpc.thrift.{TColumn, TColumnValue}
+import org.apache.kyuubi.engine.jdbc.mysql.MySQLTRowSetGenerator
-class DorisTRowSetGenerator extends DefaultJdbcTRowSetGenerator {
-
- override def toTinyIntTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
- toIntegerTColumn(rows, ordinal)
-
- override def toSmallIntTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
- toIntegerTColumn(rows, ordinal)
-
- override def toTinyIntTColumnValue(row: Seq[_], ordinal: Int): TColumnValue =
- toIntegerTColumnValue(row, ordinal)
-
- override def toSmallIntTColumnValue(row: Seq[_], ordinal: Int): TColumnValue
=
- toIntegerTColumnValue(row, ordinal)
-}
+class DorisTRowSetGenerator extends MySQLTRowSetGenerator {}
diff --git
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/mysql/MySQLTRowSetGenerator.scala
b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/mysql/MySQLTRowSetGenerator.scala
index 1ed605370..c029131fa 100644
---
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/mysql/MySQLTRowSetGenerator.scala
+++
b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/mysql/MySQLTRowSetGenerator.scala
@@ -16,6 +16,7 @@
*/
package org.apache.kyuubi.engine.jdbc.mysql
+import java.lang.{Long => JLong}
import java.sql.Types
import org.apache.kyuubi.engine.jdbc.schema.DefaultJdbcTRowSetGenerator
@@ -24,45 +25,45 @@ import
org.apache.kyuubi.shaded.hive.service.rpc.thrift.{TColumn, TColumnValue}
class MySQLTRowSetGenerator extends DefaultJdbcTRowSetGenerator {
override def toTinyIntTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
- toIntegerTColumn(rows, ordinal)
+ asIntegerTColumn(rows, ordinal)
override def toSmallIntTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
- toIntegerTColumn(rows, ordinal)
+ asIntegerTColumn(rows, ordinal)
override def toTinyIntTColumnValue(row: Seq[_], ordinal: Int): TColumnValue =
- toIntegerTColumnValue(row, ordinal)
+ asIntegerTColumnValue(row, ordinal)
override def toSmallIntTColumnValue(row: Seq[_], ordinal: Int): TColumnValue
=
- toIntegerTColumnValue(row, ordinal)
+ asIntegerTColumnValue(row, ordinal)
override def toIntegerTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn = {
val colHead = if (rows.isEmpty) None else rows.head(ordinal)
colHead match {
case _: Integer => super.toIntegerTColumn(rows, ordinal)
- case _: java.lang.Long => super.toBigIntTColumn(rows, ordinal)
+ case _: JLong => super.toBigIntTColumn(rows, ordinal)
case _ => super.toDefaultTColumn(rows, ordinal, Types.INTEGER)
}
}
- override protected def toIntegerTColumnValue(row: Seq[_], ordinal: Int):
TColumnValue = {
+ override def toIntegerTColumnValue(row: Seq[_], ordinal: Int): TColumnValue
= {
row(ordinal) match {
case _: Integer => super.toIntegerTColumnValue(row, ordinal)
- case _: java.lang.Long => super.toBigIntTColumnValue(row, ordinal)
+ case _: JLong => super.toBigIntTColumnValue(row, ordinal)
case _ => super.toDefaultTColumnValue(row, ordinal, Types.INTEGER)
}
}
- override protected def toBigIntTColumn(rows: Seq[Seq[_]], ordinal: Int):
TColumn = {
+ override def toBigIntTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn = {
val colHead = if (rows.isEmpty) None else rows.head(ordinal)
colHead match {
- case v: java.lang.Long => super.toBigIntTColumn(rows, ordinal)
+ case _: JLong => super.toBigIntTColumn(rows, ordinal)
case _ => super.toDefaultTColumn(rows, ordinal, Types.BIGINT)
}
}
- override protected def toBigIntTColumnValue(row: Seq[_], ordinal: Int):
TColumnValue =
+ override def toBigIntTColumnValue(row: Seq[_], ordinal: Int): TColumnValue =
row(ordinal) match {
- case v: java.lang.Long => super.toBigIntTColumnValue(row, ordinal)
+ case _: JLong => super.toBigIntTColumnValue(row, ordinal)
case _ => super.toDefaultTColumnValue(row, ordinal, Types.BIGINT)
}
}
diff --git
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/postgresql/PostgreSQLTRowSetGenerator.scala
b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/phoenix/PhoenixTRowSetGenerator.scala
similarity index 66%
copy from
externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/postgresql/PostgreSQLTRowSetGenerator.scala
copy to
externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/phoenix/PhoenixTRowSetGenerator.scala
index 0d02aa732..f8740fce4 100644
---
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/postgresql/PostgreSQLTRowSetGenerator.scala
+++
b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/phoenix/PhoenixTRowSetGenerator.scala
@@ -14,16 +14,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kyuubi.engine.jdbc.postgresql
+package org.apache.kyuubi.engine.jdbc.phoenix
import org.apache.kyuubi.engine.jdbc.schema.DefaultJdbcTRowSetGenerator
-import org.apache.kyuubi.shaded.hive.service.rpc.thrift.{TColumn, TColumnValue}
-class PostgreSQLTRowSetGenerator extends DefaultJdbcTRowSetGenerator {
-
- override def toSmallIntTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
- toIntegerTColumn(rows, ordinal)
-
- override protected def toSmallIntTColumnValue(row: Seq[_], ordinal: Int):
TColumnValue =
- toIntegerTColumnValue(row, ordinal)
-}
+class PhoenixTRowSetGenerator extends DefaultJdbcTRowSetGenerator {}
diff --git
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/postgresql/PostgreSQLTRowSetGenerator.scala
b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/postgresql/PostgreSQLTRowSetGenerator.scala
index 0d02aa732..104b3b15d 100644
---
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/postgresql/PostgreSQLTRowSetGenerator.scala
+++
b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/postgresql/PostgreSQLTRowSetGenerator.scala
@@ -24,6 +24,6 @@ class PostgreSQLTRowSetGenerator extends
DefaultJdbcTRowSetGenerator {
override def toSmallIntTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
toIntegerTColumn(rows, ordinal)
- override protected def toSmallIntTColumnValue(row: Seq[_], ordinal: Int):
TColumnValue =
+ override def toSmallIntTColumnValue(row: Seq[_], ordinal: Int): TColumnValue
=
toIntegerTColumnValue(row, ordinal)
}
diff --git
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/schema/DefaultJdbcTRowSetGenerator.scala
b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/schema/DefaultJdbcTRowSetGenerator.scala
index a2ad762be..2621379c1 100644
---
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/schema/DefaultJdbcTRowSetGenerator.scala
+++
b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/schema/DefaultJdbcTRowSetGenerator.scala
@@ -19,11 +19,9 @@ package org.apache.kyuubi.engine.jdbc.schema
import java.sql.Date
import java.sql.Types._
import java.time.LocalDateTime
-import java.util
import org.apache.kyuubi.shaded.hive.service.rpc.thrift._
-import org.apache.kyuubi.shaded.hive.service.rpc.thrift.TTypeId._
-import org.apache.kyuubi.util.RowSetUtils.{bitSetToBuffer, formatDate,
formatLocalDateTime}
+import org.apache.kyuubi.util.RowSetUtils.{formatDate, formatLocalDateTime}
class DefaultJdbcTRowSetGenerator extends JdbcTRowSetGenerator {
@@ -41,7 +39,7 @@ class DefaultJdbcTRowSetGenerator extends
JdbcTRowSetGenerator {
case _ => toDefaultTColumn(rows, ordinal, sqlType)
}
- override def toTColumnValue(ordinal: Int, row: Seq[_], types: Seq[Column]):
TColumnValue =
+ override def toTColumnValue(row: Seq[_], ordinal: Int, types: Seq[Column]):
TColumnValue = {
getColumnType(types, ordinal) match {
case BIT => toBitTColumnValue(row, ordinal)
case TINYINT => toTinyIntTColumnValue(row, ordinal)
@@ -54,93 +52,74 @@ class DefaultJdbcTRowSetGenerator extends
JdbcTRowSetGenerator {
case VARCHAR => toVarcharTColumnValue(row, ordinal)
case otherType => toDefaultTColumnValue(row, ordinal, otherType)
}
-
- protected def toDefaultTColumn(rows: Seq[Seq[_]], ordinal: Int, sqlType:
Int): TColumn = {
- val nulls = new java.util.BitSet()
- val rowSize = rows.length
- val values = new util.ArrayList[String](rowSize)
- var i = 0
- while (i < rowSize) {
- val row = rows(i)
- nulls.set(i, row(ordinal) == null)
- val value =
- if (row(ordinal) == null) {
- ""
- } else {
- toHiveString(row(ordinal), sqlType)
- }
- values.add(value)
- i += 1
- }
- TColumn.stringVal(new TStringColumn(values, nulls))
}
- protected def toBitTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
- toTTypeColumn(BOOLEAN_TYPE, rows, ordinal)
+ def toDefaultTColumn(rows: Seq[Seq[_]], ordinal: Int, sqlType: Int): TColumn
=
+ asStringTColumn(
+ rows,
+ ordinal,
+ convertFunc = (row, ordinal) => toHiveString(row(ordinal), sqlType))
+
+ def toBitTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
+ asBooleanTColumn(rows, ordinal)
- protected def toTinyIntTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
- toTTypeColumn(TINYINT_TYPE, rows, ordinal)
+ def toTinyIntTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
+ asShortTColumn(rows, ordinal)
- protected def toSmallIntTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
- toTTypeColumn(SMALLINT_TYPE, rows, ordinal)
+ def toSmallIntTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
+ asShortTColumn(rows, ordinal)
- protected def toIntegerTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
- toTTypeColumn(INT_TYPE, rows, ordinal)
+ def toIntegerTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
+ asIntegerTColumn(rows, ordinal)
- protected def toBigIntTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
- toTTypeColumn(BIGINT_TYPE, rows, ordinal)
+ def toBigIntTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
+ asLongTColumn(rows, ordinal)
- protected def toRealTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
- toTTypeColumn(FLOAT_TYPE, rows, ordinal)
+ def toRealTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
+ asFloatTColumn(rows, ordinal)
- protected def toDoubleTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
- toTTypeColumn(DOUBLE_TYPE, rows, ordinal)
+ def toDoubleTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
+ asDoubleTColumn(rows, ordinal)
- protected def toCharTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
- toTTypeColumn(CHAR_TYPE, rows, ordinal)
+ def toCharTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
+ asStringTColumn(rows, ordinal)
- protected def toVarcharTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
- toTTypeColumn(STRING_TYPE, rows, ordinal)
+ def toVarcharTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
+ asStringTColumn(rows, ordinal)
// ==========================================================
- protected def toBitTColumnValue(row: Seq[_], ordinal: Int): TColumnValue =
- toTTypeColumnVal(BOOLEAN_TYPE, row, ordinal)
+ def toBitTColumnValue(row: Seq[_], ordinal: Int): TColumnValue =
+ asBooleanTColumnValue(row, ordinal)
- protected def toTinyIntTColumnValue(row: Seq[_], ordinal: Int): TColumnValue
=
- toTTypeColumnVal(TINYINT_TYPE, row, ordinal)
+ def toTinyIntTColumnValue(row: Seq[_], ordinal: Int): TColumnValue =
+ asShortTColumnValue(row, ordinal)
- protected def toSmallIntTColumnValue(row: Seq[_], ordinal: Int):
TColumnValue =
- toTTypeColumnVal(SMALLINT_TYPE, row, ordinal)
+ def toSmallIntTColumnValue(row: Seq[_], ordinal: Int): TColumnValue =
+ asShortTColumnValue(row, ordinal)
- protected def toIntegerTColumnValue(row: Seq[_], ordinal: Int): TColumnValue
=
- toTTypeColumnVal(INT_TYPE, row, ordinal)
+ def toIntegerTColumnValue(row: Seq[_], ordinal: Int): TColumnValue =
+ asIntegerTColumnValue(row, ordinal)
- protected def toBigIntTColumnValue(row: Seq[_], ordinal: Int): TColumnValue =
- toTTypeColumnVal(BIGINT_TYPE, row, ordinal)
+ def toBigIntTColumnValue(row: Seq[_], ordinal: Int): TColumnValue =
+ asLongTColumnValue(row, ordinal)
- protected def toRealTColumnValue(row: Seq[_], ordinal: Int): TColumnValue =
- toTTypeColumnVal(FLOAT_TYPE, row, ordinal)
+ def toRealTColumnValue(row: Seq[_], ordinal: Int): TColumnValue =
+ asFloatTColumnValue(row, ordinal)
- protected def toDoubleTColumnValue(row: Seq[_], ordinal: Int): TColumnValue =
- toTTypeColumnVal(DOUBLE_TYPE, row, ordinal)
+ def toDoubleTColumnValue(row: Seq[_], ordinal: Int): TColumnValue =
+ asDoubleTColumnValue(row, ordinal)
- protected def toCharTColumnValue(row: Seq[_], ordinal: Int): TColumnValue =
- toTTypeColumnVal(STRING_TYPE, row, ordinal)
+ def toCharTColumnValue(row: Seq[_], ordinal: Int): TColumnValue =
+ asStringTColumnValue(row, ordinal)
- protected def toVarcharTColumnValue(row: Seq[_], ordinal: Int): TColumnValue
=
- toTTypeColumnVal(STRING_TYPE, row, ordinal)
+ def toVarcharTColumnValue(row: Seq[_], ordinal: Int): TColumnValue =
+ asStringTColumnValue(row, ordinal)
- protected def toDefaultTColumnValue(row: Seq[_], ordinal: Int, sqlType:
Int): TColumnValue = {
- val tStrValue = new TStringValue
- if (row(ordinal) != null) {
- tStrValue.setValue(
- toHiveString(row(ordinal), sqlType))
- }
- TColumnValue.stringVal(tStrValue)
- }
+ def toDefaultTColumnValue(row: Seq[_], ordinal: Int, sqlType: Int):
TColumnValue =
+ asStringTColumnValue(row, ordinal, rawValue => toHiveString(rawValue,
sqlType))
- protected def toHiveString(data: Any, sqlType: Int): String =
+ def toHiveString(data: Any, sqlType: Int): String =
(data, sqlType) match {
case (date: Date, DATE) => formatDate(date)
case (dateTime: LocalDateTime, TIMESTAMP) =>
formatLocalDateTime(dateTime)
diff --git
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/schema/JdbcTRowSetGenerator.scala
b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/schema/JdbcTRowSetGenerator.scala
index 519ac5f79..233a6a799 100644
---
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/schema/JdbcTRowSetGenerator.scala
+++
b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/schema/JdbcTRowSetGenerator.scala
@@ -16,13 +16,12 @@
*/
package org.apache.kyuubi.engine.jdbc.schema
-import org.apache.kyuubi.engine.schema.AbstractTRowSetGenerator
+import org.apache.kyuubi.engine.result.TRowSetGenerator
-trait JdbcTRowSetGenerator extends AbstractTRowSetGenerator[Seq[Column],
Seq[_], Int] {
- override protected def getColumnSizeFromSchemaType(schema: Seq[Column]): Int
= schema.length
+trait JdbcTRowSetGenerator extends TRowSetGenerator[Seq[Column], Seq[_], Int] {
+ override def getColumnSizeFromSchemaType(schema: Seq[Column]): Int =
schema.length
- override protected def getColumnType(schema: Seq[Column], ordinal: Int): Int
=
- schema(ordinal).sqlType
+ override def getColumnType(schema: Seq[Column], ordinal: Int): Int =
schema(ordinal).sqlType
override protected def isColumnNullAt(row: Seq[_], ordinal: Int): Boolean =
row(ordinal) == null
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/SparkArrowTRowSetGenerator.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/SparkArrowTRowSetGenerator.scala
index ded022ad0..054df0dd6 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/SparkArrowTRowSetGenerator.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/SparkArrowTRowSetGenerator.scala
@@ -21,12 +21,11 @@ import java.nio.ByteBuffer
import org.apache.spark.sql.types._
-import org.apache.kyuubi.engine.schema.AbstractTRowSetGenerator
+import org.apache.kyuubi.engine.result.TRowSetGenerator
import org.apache.kyuubi.shaded.hive.service.rpc.thrift._
-import org.apache.kyuubi.util.RowSetUtils.bitSetToBuffer
class SparkArrowTRowSetGenerator
- extends AbstractTRowSetGenerator[StructType, Array[Byte], DataType] {
+ extends TRowSetGenerator[StructType, Array[Byte], DataType] {
override def toColumnBasedSet(rows: Seq[Array[Byte]], schema: StructType):
TRowSet = {
require(schema.length == 1, "ArrowRowSetGenerator accepts only one single
byte array")
require(schema.head.dataType == BinaryType, "ArrowRowSetGenerator accepts
only BinaryType")
@@ -43,8 +42,7 @@ class SparkArrowTRowSetGenerator
case BinaryType =>
val values = new java.util.ArrayList[ByteBuffer](1)
values.add(ByteBuffer.wrap(rows.head))
- val nulls = new java.util.BitSet()
- TColumn.binaryVal(new TBinaryColumn(values, nulls))
+ TColumn.binaryVal(new TBinaryColumn(values,
ByteBuffer.wrap(Array[Byte]())))
case _ => throw new IllegalArgumentException(
s"unsupported datatype $typ, ArrowRowSetGenerator accepts only
BinaryType")
}
@@ -70,7 +68,7 @@ class SparkArrowTRowSetGenerator
throw new UnsupportedOperationException
}
- override def toTColumnValue(ordinal: Int, row: Array[Byte], types:
StructType): TColumnValue = {
+ override def toTColumnValue(row: Array[Byte], ordinal: Int, types:
StructType): TColumnValue = {
throw new UnsupportedOperationException
}
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/SparkTRowSetGenerator.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/SparkTRowSetGenerator.scala
index a35455292..1d1b5ef6a 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/SparkTRowSetGenerator.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/SparkTRowSetGenerator.scala
@@ -19,16 +19,13 @@ package org.apache.kyuubi.engine.spark.schema
import org.apache.spark.sql.Row
import org.apache.spark.sql.execution.HiveResult
-import org.apache.spark.sql.execution.HiveResult.TimeFormatters
import org.apache.spark.sql.types._
-import org.apache.kyuubi.engine.schema.AbstractTRowSetGenerator
+import org.apache.kyuubi.engine.result.TRowSetGenerator
import org.apache.kyuubi.shaded.hive.service.rpc.thrift._
-import org.apache.kyuubi.shaded.hive.service.rpc.thrift.TTypeId._
-import org.apache.kyuubi.util.RowSetUtils.bitSetToBuffer
class SparkTRowSetGenerator
- extends AbstractTRowSetGenerator[StructType, Row, DataType] {
+ extends TRowSetGenerator[StructType, Row, DataType] {
// reused time formatters in single RowSet generation, see KYUUBI-5811
private val tf = HiveResult.getTimeFormatters
@@ -42,51 +39,43 @@ class SparkTRowSetGenerator
override def getColumnAs[T](row: Row, ordinal: Int): T =
row.getAs[T](ordinal)
override def toTColumn(rows: Seq[Row], ordinal: Int, typ: DataType): TColumn
= {
- val timeFormatters: TimeFormatters = tf
- val nulls = new java.util.BitSet()
typ match {
- case BooleanType => toTTypeColumn(BOOLEAN_TYPE, rows, ordinal)
- case ByteType => toTTypeColumn(BINARY_TYPE, rows, ordinal)
- case ShortType => toTTypeColumn(TINYINT_TYPE, rows, ordinal)
- case IntegerType => toTTypeColumn(INT_TYPE, rows, ordinal)
- case LongType => toTTypeColumn(BIGINT_TYPE, rows, ordinal)
- case FloatType => toTTypeColumn(FLOAT_TYPE, rows, ordinal)
- case DoubleType => toTTypeColumn(DOUBLE_TYPE, rows, ordinal)
- case StringType => toTTypeColumn(STRING_TYPE, rows, ordinal)
- case BinaryType => toTTypeColumn(ARRAY_TYPE, rows, ordinal)
+ case BooleanType => asBooleanTColumn(rows, ordinal)
+ case ByteType => asByteTColumn(rows, ordinal)
+ case ShortType => asShortTColumn(rows, ordinal)
+ case IntegerType => asIntegerTColumn(rows, ordinal)
+ case LongType => asLongTColumn(rows, ordinal)
+ case FloatType => asFloatTColumn(rows, ordinal)
+ case DoubleType => asDoubleTColumn(rows, ordinal)
+ case StringType => asStringTColumn(rows, ordinal)
+ case BinaryType => asByteArrayTColumn(rows, ordinal)
case _ =>
- var i = 0
- val rowSize = rows.length
- val values = new java.util.ArrayList[String](rowSize)
- while (i < rowSize) {
- val row = rows(i)
- nulls.set(i, row.isNullAt(ordinal))
- values.add(RowSet.toHiveString(row.get(ordinal) -> typ,
timeFormatters = timeFormatters))
- i += 1
- }
- TColumn.stringVal(new TStringColumn(values, nulls))
+ val timeFormatters = tf
+ asStringTColumn(
+ rows,
+ ordinal,
+ "NULL",
+ (row, ordinal) =>
+ RowSet.toHiveString(
+ getColumnAs[Any](row, ordinal) -> typ,
+ timeFormatters = timeFormatters))
}
}
- override def toTColumnValue(ordinal: Int, row: Row, types: StructType):
TColumnValue = {
- val timeFormatters: TimeFormatters = tf
+ override def toTColumnValue(row: Row, ordinal: Int, types: StructType):
TColumnValue = {
getColumnType(types, ordinal) match {
- case BooleanType => toTTypeColumnVal(BOOLEAN_TYPE, row, ordinal)
- case ByteType => toTTypeColumnVal(BINARY_TYPE, row, ordinal)
- case ShortType => toTTypeColumnVal(TINYINT_TYPE, row, ordinal)
- case IntegerType => toTTypeColumnVal(INT_TYPE, row, ordinal)
- case LongType => toTTypeColumnVal(BIGINT_TYPE, row, ordinal)
- case FloatType => toTTypeColumnVal(FLOAT_TYPE, row, ordinal)
- case DoubleType => toTTypeColumnVal(DOUBLE_TYPE, row, ordinal)
- case StringType => toTTypeColumnVal(STRING_TYPE, row, ordinal)
- case _ =>
- val tStrValue = new TStringValue
- if (!row.isNullAt(ordinal)) {
- tStrValue.setValue(RowSet.toHiveString(
- row.get(ordinal) -> types(ordinal).dataType,
- timeFormatters = timeFormatters))
- }
- TColumnValue.stringVal(tStrValue)
+ case BooleanType => asBooleanTColumnValue(row, ordinal)
+ case ByteType => asByteTColumnValue(row, ordinal)
+ case ShortType => asShortTColumnValue(row, ordinal)
+ case IntegerType => asIntegerTColumnValue(row, ordinal)
+ case LongType => asLongTColumnValue(row, ordinal)
+ case FloatType => asFloatTColumnValue(row, ordinal)
+ case DoubleType => asDoubleTColumnValue(row, ordinal)
+ case StringType => asStringTColumnValue(row, ordinal)
+ case _ => asStringTColumnValue(
+ row,
+ ordinal,
+ rawValue => RowSet.toHiveString(rawValue -> types(ordinal).dataType,
timeFormatters = tf))
}
}
diff --git
a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/schema/TrinoTRowSetGenerator.scala
b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/schema/TrinoTRowSetGenerator.scala
index 9c323a508..57d91b371 100644
---
a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/schema/TrinoTRowSetGenerator.scala
+++
b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/schema/TrinoTRowSetGenerator.scala
@@ -20,76 +20,56 @@ package org.apache.kyuubi.engine.trino.schema
import io.trino.client.{ClientTypeSignature, Column}
import io.trino.client.ClientStandardTypes._
-import org.apache.kyuubi.engine.schema.AbstractTRowSetGenerator
+import org.apache.kyuubi.engine.result.TRowSetGenerator
import org.apache.kyuubi.engine.trino.schema.RowSet.toHiveString
import org.apache.kyuubi.shaded.hive.service.rpc.thrift._
-import org.apache.kyuubi.shaded.hive.service.rpc.thrift.TTypeId._
-import org.apache.kyuubi.util.RowSetUtils._
class TrinoTRowSetGenerator
- extends AbstractTRowSetGenerator[Seq[Column], Seq[_], ClientTypeSignature] {
+ extends TRowSetGenerator[Seq[Column], Seq[_], ClientTypeSignature] {
override def getColumnSizeFromSchemaType(schema: Seq[Column]): Int =
schema.length
- override def getColumnType(schema: Seq[Column], ordinal: Int):
ClientTypeSignature = {
+ override def getColumnType(schema: Seq[Column], ordinal: Int):
ClientTypeSignature =
schema(ordinal).getTypeSignature
- }
- override def isColumnNullAt(row: Seq[_], ordinal: Int): Boolean =
- row(ordinal) == null
+ override def isColumnNullAt(row: Seq[_], ordinal: Int): Boolean =
row(ordinal) == null
- override def getColumnAs[T](row: Seq[_], ordinal: Int): T =
- row(ordinal).asInstanceOf[T]
+ override def getColumnAs[T](row: Seq[_], ordinal: Int): T =
row(ordinal).asInstanceOf[T]
override def toTColumn(rows: Seq[Seq[_]], ordinal: Int, typ:
ClientTypeSignature): TColumn = {
- val nulls = new java.util.BitSet()
typ.getRawType match {
- case BOOLEAN => toTTypeColumn(BOOLEAN_TYPE, rows, ordinal)
- case TINYINT => toTTypeColumn(BINARY_TYPE, rows, ordinal)
- case SMALLINT => toTTypeColumn(TINYINT_TYPE, rows, ordinal)
- case INTEGER => toTTypeColumn(INT_TYPE, rows, ordinal)
- case BIGINT => toTTypeColumn(BIGINT_TYPE, rows, ordinal)
- case REAL => toTTypeColumn(FLOAT_TYPE, rows, ordinal)
- case DOUBLE => toTTypeColumn(DOUBLE_TYPE, rows, ordinal)
- case VARCHAR => toTTypeColumn(STRING_TYPE, rows, ordinal)
- case VARBINARY => toTTypeColumn(ARRAY_TYPE, rows, ordinal)
+ case BOOLEAN => asBooleanTColumn(rows, ordinal)
+ case TINYINT => asByteTColumn(rows, ordinal)
+ case SMALLINT => asShortTColumn(rows, ordinal)
+ case INTEGER => asIntegerTColumn(rows, ordinal)
+ case BIGINT => asLongTColumn(rows, ordinal)
+ case REAL => asFloatTColumn(rows, ordinal)
+ case DOUBLE => asDoubleTColumn(rows, ordinal)
+ case VARCHAR => asStringTColumn(rows, ordinal)
+ case VARBINARY => asByteArrayTColumn(rows, ordinal)
case _ =>
- val rowSize = rows.length
- val values = new java.util.ArrayList[String](rowSize)
- var i = 0
- while (i < rowSize) {
- val row = rows(i)
- val isNull = isColumnNullAt(row, ordinal)
- nulls.set(i, isNull)
- val value = if (isNull) {
- ""
- } else {
- toHiveString(row(ordinal), typ)
- }
- values.add(value)
- i += 1
- }
- TColumn.stringVal(new TStringColumn(values, nulls))
+ asStringTColumn(
+ rows,
+ ordinal,
+ convertFunc = (row, ordinal) => toHiveString(getColumnAs[Any](row,
ordinal), typ))
}
}
- override def toTColumnValue(ordinal: Int, row: Seq[_], types: Seq[Column]):
TColumnValue = {
+ override def toTColumnValue(row: Seq[_], ordinal: Int, types: Seq[Column]):
TColumnValue = {
getColumnType(types, ordinal).getRawType match {
- case BOOLEAN => toTTypeColumnVal(BOOLEAN_TYPE, row, ordinal)
- case TINYINT => toTTypeColumnVal(BINARY_TYPE, row, ordinal)
- case SMALLINT => toTTypeColumnVal(TINYINT_TYPE, row, ordinal)
- case INTEGER => toTTypeColumnVal(INT_TYPE, row, ordinal)
- case BIGINT => toTTypeColumnVal(BIGINT_TYPE, row, ordinal)
- case REAL => toTTypeColumnVal(FLOAT_TYPE, row, ordinal)
- case DOUBLE => toTTypeColumnVal(DOUBLE_TYPE, row, ordinal)
- case VARCHAR => toTTypeColumnVal(STRING_TYPE, row, ordinal)
+ case BOOLEAN => asBooleanTColumnValue(row, ordinal)
+ case TINYINT => asByteTColumnValue(row, ordinal)
+ case SMALLINT => asShortTColumnValue(row, ordinal)
+ case INTEGER => asIntegerTColumnValue(row, ordinal)
+ case BIGINT => asLongTColumnValue(row, ordinal)
+ case REAL => asFloatTColumnValue(row, ordinal)
+ case DOUBLE => asDoubleTColumnValue(row, ordinal)
+ case VARCHAR => asStringTColumnValue(row, ordinal)
case _ =>
- val tStrValue = new TStringValue
- if (row(ordinal) != null) {
- tStrValue.setValue(
- toHiveString(row(ordinal), types(ordinal).getTypeSignature))
- }
- TColumnValue.stringVal(tStrValue)
+ asStringTColumnValue(
+ row,
+ ordinal,
+ rawValue => toHiveString(rawValue, types(ordinal).getTypeSignature))
}
}
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/result/TColumnGenerator.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/result/TColumnGenerator.scala
new file mode 100644
index 000000000..e2c8f1ea6
--- /dev/null
+++
b/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/result/TColumnGenerator.scala
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.result
+import java.lang.{Boolean => JBoolean, Byte => JByte, Double => JDouble, Float
=> JFloat, Long => JLong, Short => JShort}
+import java.nio.ByteBuffer
+import java.util.{ArrayList => JArrayList, BitSet => JBitSet, List => JList}
+
+import scala.collection.JavaConverters._
+
+import org.apache.kyuubi.shaded.hive.service.rpc.thrift._
+
+trait TColumnGenerator[RowT] extends TRowSetColumnGetter[RowT] {
+ protected def getColumnToList[T](
+ rows: Seq[RowT],
+ ordinal: Int,
+ defaultVal: T,
+ convertFunc: (RowT, Int) => T = null): (JList[T], ByteBuffer) = {
+ val rowSize = rows.length
+ val ret = new JArrayList[T](rowSize)
+ val nulls = new JBitSet()
+ var idx = 0
+ while (idx < rowSize) {
+ val row = rows(idx)
+ val isNull = isColumnNullAt(row, ordinal)
+ if (isNull) {
+ nulls.set(idx, true)
+ ret.add(defaultVal)
+ } else {
+ val value = Option(convertFunc) match {
+ case Some(f) => f(row, ordinal)
+ case _ => getColumnAs[T](row, ordinal)
+ }
+ ret.add(value)
+ }
+ idx += 1
+ }
+ (ret, ByteBuffer.wrap(nulls.toByteArray))
+ }
+
+ def asBooleanTColumn(rows: Seq[RowT], ordinal: Int): TColumn = {
+ val (values, nulls) = getColumnToList[JBoolean](rows, ordinal, true)
+ TColumn.boolVal(new TBoolColumn(values, nulls))
+ }
+
+ def asByteTColumn(rows: Seq[RowT], ordinal: Int): TColumn = {
+ val (values, nulls) = getColumnToList[JByte](rows, ordinal, 0.toByte)
+ TColumn.byteVal(new TByteColumn(values, nulls))
+ }
+
+ def asShortTColumn(rows: Seq[RowT], ordinal: Int): TColumn = {
+ val (values, nulls) = getColumnToList[JShort](rows, ordinal, 0.toShort)
+ TColumn.i16Val(new TI16Column(values, nulls))
+ }
+
+ def asIntegerTColumn(rows: Seq[RowT], ordinal: Int): TColumn = {
+ val (values, nulls) = getColumnToList[Integer](rows, ordinal, 0)
+ TColumn.i32Val(new TI32Column(values, nulls))
+ }
+
+ def asLongTColumn(rows: Seq[RowT], ordinal: Int): TColumn = {
+ val (values, nulls) = getColumnToList[JLong](rows, ordinal, 0.toLong)
+ TColumn.i64Val(new TI64Column(values, nulls))
+ }
+
+ def asFloatTColumn(rows: Seq[RowT], ordinal: Int): TColumn = {
+ val (values, nulls) = getColumnToList[JFloat](rows, ordinal, 0.toFloat)
+ val doubleValues = values.asScala.map(f =>
JDouble.valueOf(f.toString)).asJava
+ TColumn.doubleVal(new TDoubleColumn(doubleValues, nulls))
+ }
+
+ def asDoubleTColumn(rows: Seq[RowT], ordinal: Int): TColumn = {
+ val (values, nulls) = getColumnToList[JDouble](rows, ordinal, 0.toDouble)
+ TColumn.doubleVal(new TDoubleColumn(values, nulls))
+ }
+
+ def asStringTColumn(
+ rows: Seq[RowT],
+ ordinal: Int,
+ defaultVal: String = "",
+ convertFunc: (RowT, Int) => String = null): TColumn = {
+ val (values, nulls) = getColumnToList[String](rows, ordinal, defaultVal,
convertFunc)
+ TColumn.stringVal(new TStringColumn(values, nulls))
+ }
+
+ def asByteArrayTColumn(rows: Seq[RowT], ordinal: Int): TColumn = {
+ val (values, nulls) = getColumnToList[Array[Byte]](rows, ordinal,
defaultVal = Array[Byte]())
+ val byteBufferValues = values.asScala.map(ByteBuffer.wrap).asJava
+ TColumn.binaryVal(new TBinaryColumn(byteBufferValues, nulls))
+ }
+}
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/result/TColumnValueGenerator.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/result/TColumnValueGenerator.scala
new file mode 100644
index 000000000..0ff3a250d
--- /dev/null
+++
b/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/result/TColumnValueGenerator.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.result
+
+import java.lang.{Boolean => JBoolean, Byte => JByte, Double => JDouble, Float
=> JFloat, Long => JLong, Short => JShort}
+
+import org.apache.kyuubi.shaded.hive.service.rpc.thrift._
+
+trait TColumnValueGenerator[RowT] extends TRowSetColumnGetter[RowT] {
+
+ def asBooleanTColumnValue(row: RowT, ordinal: Int): TColumnValue = {
+ val tValue = new TBoolValue
+ if (!isColumnNullAt(row, ordinal)) {
+ tValue.setValue(getColumnAs[JBoolean](row, ordinal))
+ }
+ TColumnValue.boolVal(tValue)
+ }
+
+ def asByteTColumnValue(row: RowT, ordinal: Int): TColumnValue = {
+ val tValue = new TByteValue
+ if (!isColumnNullAt(row, ordinal)) {
+ tValue.setValue(getColumnAs[JByte](row, ordinal))
+ }
+ TColumnValue.byteVal(tValue)
+ }
+
+ def asShortTColumnValue(row: RowT, ordinal: Int): TColumnValue = {
+ val tValue = new TI16Value
+ if (!isColumnNullAt(row, ordinal)) {
+ tValue.setValue(getColumnAs[JShort](row, ordinal))
+ }
+ TColumnValue.i16Val(tValue)
+ }
+
+ def asIntegerTColumnValue(row: RowT, ordinal: Int): TColumnValue = {
+ val tValue = new TI32Value
+ if (!isColumnNullAt(row, ordinal)) {
+ tValue.setValue(getColumnAs[Integer](row, ordinal))
+ }
+ TColumnValue.i32Val(tValue)
+ }
+
+ def asLongTColumnValue(row: RowT, ordinal: Int): TColumnValue = {
+ val tValue = new TI64Value
+ if (!isColumnNullAt(row, ordinal)) {
+ tValue.setValue(getColumnAs[JLong](row, ordinal))
+ }
+ TColumnValue.i64Val(tValue)
+ }
+
+ def asFloatTColumnValue(row: RowT, ordinal: Int): TColumnValue = {
+ val tValue = new TDoubleValue
+ if (!isColumnNullAt(row, ordinal)) {
+ tValue.setValue(getColumnAs[JFloat](row, ordinal).toDouble)
+ }
+ TColumnValue.doubleVal(tValue)
+ }
+
+ def asDoubleTColumnValue(row: RowT, ordinal: Int): TColumnValue = {
+ val tValue = new TDoubleValue
+ if (!isColumnNullAt(row, ordinal)) {
+ tValue.setValue(getColumnAs[JDouble](row, ordinal))
+ }
+ TColumnValue.doubleVal(tValue)
+ }
+
+ def asStringTColumnValue(
+ row: RowT,
+ ordinal: Int,
+ convertFunc: Any => String = null): TColumnValue = {
+ val tValue = new TStringValue
+ if (!isColumnNullAt(row, ordinal)) {
+ val str = getColumnAs[Any](row, ordinal) match {
+ case strObj: String => strObj
+ case obj if convertFunc != null => convertFunc(obj)
+ case anyObj => String.valueOf(anyObj)
+ }
+ tValue.setValue(str)
+ }
+ TColumnValue.stringVal(tValue)
+ }
+}
diff --git
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/postgresql/PostgreSQLTRowSetGenerator.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/result/TRowSetColumnGetter.scala
similarity index 60%
copy from
externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/postgresql/PostgreSQLTRowSetGenerator.scala
copy to
kyuubi-common/src/main/scala/org/apache/kyuubi/engine/result/TRowSetColumnGetter.scala
index 0d02aa732..3f6b6a16a 100644
---
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/postgresql/PostgreSQLTRowSetGenerator.scala
+++
b/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/result/TRowSetColumnGetter.scala
@@ -14,16 +14,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kyuubi.engine.jdbc.postgresql
-import org.apache.kyuubi.engine.jdbc.schema.DefaultJdbcTRowSetGenerator
-import org.apache.kyuubi.shaded.hive.service.rpc.thrift.{TColumn, TColumnValue}
+package org.apache.kyuubi.engine.result
-class PostgreSQLTRowSetGenerator extends DefaultJdbcTRowSetGenerator {
+trait TRowSetColumnGetter[RowT] {
+ protected def isColumnNullAt(row: RowT, ordinal: Int): Boolean
- override def toSmallIntTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
- toIntegerTColumn(rows, ordinal)
-
- override protected def toSmallIntTColumnValue(row: Seq[_], ordinal: Int):
TColumnValue =
- toIntegerTColumnValue(row, ordinal)
+ protected def getColumnAs[T](row: RowT, ordinal: Int): T
}
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/result/TRowSetGenerator.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/result/TRowSetGenerator.scala
new file mode 100644
index 000000000..096e45ad8
--- /dev/null
+++
b/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/result/TRowSetGenerator.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.result
+import java.util.{ArrayList => JArrayList}
+
+import org.apache.kyuubi.shaded.hive.service.rpc.thrift._
+
+trait TRowSetGenerator[SchemaT, RowT, ColumnT]
+ extends TColumnValueGenerator[RowT] with TColumnGenerator[RowT] {
+
+ def getColumnSizeFromSchemaType(schema: SchemaT): Int
+
+ def getColumnType(schema: SchemaT, ordinal: Int): ColumnT
+
+ def toTColumn(rows: Seq[RowT], ordinal: Int, typ: ColumnT): TColumn
+
+ def toTColumnValue(row: RowT, ordinal: Int, types: SchemaT): TColumnValue
+
+ def toTRowSet(rows: Seq[RowT], schema: SchemaT, protocolVersion:
TProtocolVersion): TRowSet = {
+ if (protocolVersion.getValue <
TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V6.getValue) {
+ toRowBasedSet(rows, schema)
+ } else {
+ toColumnBasedSet(rows, schema)
+ }
+ }
+
+ def toRowBasedSet(rows: Seq[RowT], schema: SchemaT): TRowSet = {
+ val rowSize = rows.length
+ val tRows = new JArrayList[TRow](rowSize)
+ var i = 0
+ while (i < rowSize) {
+ val row = rows(i)
+ var j = 0
+ val columnSize = getColumnSizeFromSchemaType(schema)
+ val tColumnValues = new JArrayList[TColumnValue](columnSize)
+ while (j < columnSize) {
+ val columnValue = toTColumnValue(row, j, schema)
+ tColumnValues.add(columnValue)
+ j += 1
+ }
+ i += 1
+ val tRow = new TRow(tColumnValues)
+ tRows.add(tRow)
+ }
+ new TRowSet(0, tRows)
+ }
+
+ def toColumnBasedSet(rows: Seq[RowT], schema: SchemaT): TRowSet = {
+ val rowSize = rows.length
+ val tRowSet = new TRowSet(0, new JArrayList[TRow](rowSize))
+ var i = 0
+ val columnSize = getColumnSizeFromSchemaType(schema)
+ val tColumns = new JArrayList[TColumn](columnSize)
+ while (i < columnSize) {
+ val tColumn = toTColumn(rows, i, getColumnType(schema, i))
+ tColumns.add(tColumn)
+ i += 1
+ }
+ tRowSet.setColumns(tColumns)
+ tRowSet
+ }
+}
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/schema/AbstractTRowSetGenerator.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/schema/AbstractTRowSetGenerator.scala
deleted file mode 100644
index 3433bc2b0..000000000
---
a/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/schema/AbstractTRowSetGenerator.scala
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kyuubi.engine.schema
-import java.nio.ByteBuffer
-import java.util.{ArrayList => JArrayList, BitSet => JBitSet, List => JList}
-
-import scala.collection.JavaConverters._
-
-import org.apache.kyuubi.shaded.hive.service.rpc.thrift._
-import org.apache.kyuubi.shaded.hive.service.rpc.thrift.TTypeId._
-import org.apache.kyuubi.util.RowSetUtils.bitSetToBuffer
-
-trait AbstractTRowSetGenerator[SchemaT, RowT, ColumnT] {
-
- protected def getColumnSizeFromSchemaType(schema: SchemaT): Int
-
- protected def getColumnType(schema: SchemaT, ordinal: Int): ColumnT
-
- protected def isColumnNullAt(row: RowT, ordinal: Int): Boolean
-
- protected def getColumnAs[T](row: RowT, ordinal: Int): T
-
- protected def toTColumn(rows: Seq[RowT], ordinal: Int, typ: ColumnT): TColumn
-
- protected def toTColumnValue(ordinal: Int, row: RowT, types: SchemaT):
TColumnValue
-
- def toTRowSet(
- rows: Seq[RowT],
- schema: SchemaT,
- protocolVersion: TProtocolVersion): TRowSet = {
- if (protocolVersion.getValue <
TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V6.getValue) {
- toRowBasedSet(rows, schema)
- } else {
- toColumnBasedSet(rows, schema)
- }
- }
-
- def toRowBasedSet(rows: Seq[RowT], schema: SchemaT): TRowSet = {
- val rowSize = rows.length
- val tRows = new JArrayList[TRow](rowSize)
- var i = 0
- while (i < rowSize) {
- val row = rows(i)
- var j = 0
- val columnSize = getColumnSizeFromSchemaType(schema)
- val tColumnValues = new JArrayList[TColumnValue](columnSize)
- while (j < columnSize) {
- val columnValue = toTColumnValue(j, row, schema)
- tColumnValues.add(columnValue)
- j += 1
- }
- i += 1
- val tRow = new TRow(tColumnValues)
- tRows.add(tRow)
- }
- new TRowSet(0, tRows)
- }
-
- def toColumnBasedSet(rows: Seq[RowT], schema: SchemaT): TRowSet = {
- val rowSize = rows.length
- val tRowSet = new TRowSet(0, new JArrayList[TRow](rowSize))
- var i = 0
- val columnSize = getColumnSizeFromSchemaType(schema)
- val tColumns = new JArrayList[TColumn](columnSize)
- while (i < columnSize) {
- val tColumn = toTColumn(rows, i, getColumnType(schema, i))
- tColumns.add(tColumn)
- i += 1
- }
- tRowSet.setColumns(tColumns)
- tRowSet
- }
-
- protected def getOrSetAsNull[T](
- rows: Seq[RowT],
- ordinal: Int,
- nulls: JBitSet,
- defaultVal: T): JList[T] = {
- val size = rows.length
- val ret = new JArrayList[T](size)
- var idx = 0
- while (idx < size) {
- val row = rows(idx)
- val isNull = isColumnNullAt(row, ordinal)
- if (isNull) {
- nulls.set(idx, true)
- ret.add(defaultVal)
- } else {
- ret.add(getColumnAs[T](row, ordinal))
- }
- idx += 1
- }
- ret
- }
-
- protected def toTTypeColumnVal(typeId: TTypeId, row: RowT, ordinal: Int):
TColumnValue = {
- def isNull = isColumnNullAt(row, ordinal)
- typeId match {
- case BOOLEAN_TYPE =>
- val boolValue = new TBoolValue
- if (!isNull) boolValue.setValue(getColumnAs[java.lang.Boolean](row,
ordinal))
- TColumnValue.boolVal(boolValue)
-
- case BINARY_TYPE =>
- val byteValue = new TByteValue
- if (!isNull) byteValue.setValue(getColumnAs[java.lang.Byte](row,
ordinal))
- TColumnValue.byteVal(byteValue)
-
- case SMALLINT_TYPE | TINYINT_TYPE =>
- val tI16Value = new TI16Value
- if (!isNull) tI16Value.setValue(getColumnAs[java.lang.Short](row,
ordinal))
- TColumnValue.i16Val(tI16Value)
-
- case INT_TYPE =>
- val tI32Value = new TI32Value
- if (!isNull) tI32Value.setValue(getColumnAs[java.lang.Integer](row,
ordinal))
- TColumnValue.i32Val(tI32Value)
-
- case BIGINT_TYPE =>
- val tI64Value = new TI64Value
- if (!isNull) tI64Value.setValue(getColumnAs[java.lang.Long](row,
ordinal))
- TColumnValue.i64Val(tI64Value)
-
- case FLOAT_TYPE =>
- val tDoubleValue = new TDoubleValue
- if (!isNull) tDoubleValue.setValue(getColumnAs[java.lang.Float](row,
ordinal).toDouble)
- TColumnValue.doubleVal(tDoubleValue)
-
- case DOUBLE_TYPE =>
- val tDoubleValue = new TDoubleValue
- if (!isNull) tDoubleValue.setValue(getColumnAs[java.lang.Double](row,
ordinal))
- TColumnValue.doubleVal(tDoubleValue)
-
- case STRING_TYPE =>
- val tStringValue = new TStringValue
- if (!isNull) tStringValue.setValue(getColumnAs[String](row, ordinal))
- TColumnValue.stringVal(tStringValue)
-
- case otherType =>
- throw new UnsupportedOperationException(s"unsupported type $otherType
for toTTypeColumnVal")
- }
- }
-
- protected def toTTypeColumn(typeId: TTypeId, rows: Seq[RowT], ordinal: Int):
TColumn = {
- val nulls = new JBitSet()
- typeId match {
- case BOOLEAN_TYPE =>
- val values = getOrSetAsNull[java.lang.Boolean](rows, ordinal, nulls,
true)
- TColumn.boolVal(new TBoolColumn(values, nulls))
-
- case BINARY_TYPE =>
- val values = getOrSetAsNull[java.lang.Byte](rows, ordinal, nulls,
0.toByte)
- TColumn.byteVal(new TByteColumn(values, nulls))
-
- case SMALLINT_TYPE | TINYINT_TYPE =>
- val values = getOrSetAsNull[java.lang.Short](rows, ordinal, nulls,
0.toShort)
- TColumn.i16Val(new TI16Column(values, nulls))
-
- case INT_TYPE =>
- val values = getOrSetAsNull[java.lang.Integer](rows, ordinal, nulls, 0)
- TColumn.i32Val(new TI32Column(values, nulls))
-
- case BIGINT_TYPE =>
- val values = getOrSetAsNull[java.lang.Long](rows, ordinal, nulls, 0L)
- TColumn.i64Val(new TI64Column(values, nulls))
-
- case FLOAT_TYPE =>
- val values = getOrSetAsNull[java.lang.Float](rows, ordinal, nulls,
0.toFloat)
- .asScala.map(n => java.lang.Double.valueOf(n.toString)).asJava
- TColumn.doubleVal(new TDoubleColumn(values, nulls))
-
- case DOUBLE_TYPE =>
- val values = getOrSetAsNull[java.lang.Double](rows, ordinal, nulls,
0.toDouble)
- TColumn.doubleVal(new TDoubleColumn(values, nulls))
-
- case STRING_TYPE =>
- val values = getOrSetAsNull[java.lang.String](rows, ordinal, nulls, "")
- TColumn.stringVal(new TStringColumn(values, nulls))
-
- case CHAR_TYPE =>
- val values = getOrSetAsNull[java.lang.String](rows, ordinal, nulls, "")
- TColumn.stringVal(new TStringColumn(values, nulls))
-
- case ARRAY_TYPE =>
- val values = getOrSetAsNull[Array[Byte]](rows, ordinal, nulls, Array())
- .asScala
- .map(ByteBuffer.wrap)
- .asJava
- TColumn.binaryVal(new TBinaryColumn(values, nulls))
-
- case otherType =>
- throw new UnsupportedOperationException(s"unsupported type $otherType
for toTTypeColumnVal")
- }
- }
-}
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/RowSetUtils.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/RowSetUtils.scala
index f320fd902..c79c20327 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/RowSetUtils.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/RowSetUtils.scala
@@ -17,15 +17,12 @@
package org.apache.kyuubi.util
-import java.nio.ByteBuffer
import java.time.{Instant, LocalDate, LocalDateTime, LocalTime, ZoneId}
import java.time.chrono.IsoChronology
import java.time.format.DateTimeFormatterBuilder
import java.time.temporal.ChronoField
import java.util.{Date, Locale}
-import scala.language.implicitConversions
-
import org.apache.commons.lang3.time.FastDateFormat
private[kyuubi] object RowSetUtils {
@@ -77,8 +74,4 @@ private[kyuubi] object RowSetUtils {
timeZone.map(timestampFormatter.withZone(_).format(i))
.getOrElse(timestampFormatter.format(i))
}
-
- implicit def bitSetToBuffer(bitSet: java.util.BitSet): ByteBuffer = {
- ByteBuffer.wrap(bitSet.toByteArray)
- }
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/sql/schema/ServerTRowSetGenerator.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/sql/schema/ServerTRowSetGenerator.scala
index e1a9d55a6..96294c6eb 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/sql/schema/ServerTRowSetGenerator.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/sql/schema/ServerTRowSetGenerator.scala
@@ -17,13 +17,12 @@
package org.apache.kyuubi.sql.schema
-import org.apache.kyuubi.engine.schema.AbstractTRowSetGenerator
+import org.apache.kyuubi.engine.result.TRowSetGenerator
import org.apache.kyuubi.shaded.hive.service.rpc.thrift._
import org.apache.kyuubi.shaded.hive.service.rpc.thrift.TTypeId._
-import org.apache.kyuubi.util.RowSetUtils._
class ServerTRowSetGenerator
- extends AbstractTRowSetGenerator[Schema, Row, TTypeId] {
+ extends TRowSetGenerator[Schema, Row, TTypeId] {
override def getColumnSizeFromSchemaType(schema: Schema): Int = schema.length
@@ -34,44 +33,35 @@ class ServerTRowSetGenerator
override def getColumnAs[T](row: Row, ordinal: Int): T =
row.getAs[T](ordinal)
override def toTColumn(rows: Seq[Row], ordinal: Int, typ: TTypeId): TColumn
= {
- val nulls = new java.util.BitSet()
typ match {
- case t @ (BOOLEAN_TYPE | BINARY_TYPE | BINARY_TYPE | TINYINT_TYPE |
INT_TYPE |
- BIGINT_TYPE | FLOAT_TYPE | DOUBLE_TYPE | STRING_TYPE) =>
- toTTypeColumn(t, rows, ordinal)
-
+ case BOOLEAN_TYPE => asBooleanTColumn(rows, ordinal)
+ case BINARY_TYPE => asShortTColumn(rows, ordinal)
+ case TINYINT_TYPE => asShortTColumn(rows, ordinal)
+ case INT_TYPE => asIntegerTColumn(rows, ordinal)
+ case BIGINT_TYPE => asLongTColumn(rows, ordinal)
+ case FLOAT_TYPE => asFloatTColumn(rows, ordinal)
+ case DOUBLE_TYPE => asDoubleTColumn(rows, ordinal)
+ case STRING_TYPE => asStringTColumn(rows, ordinal)
case _ =>
- var i = 0
- val rowSize = rows.length
- val values = new java.util.ArrayList[String](rowSize)
- while (i < rowSize) {
- val row = rows(i)
- val isNull = isColumnNullAt(row, ordinal)
- nulls.set(i, isNull)
- val value = if (isNull) {
- ""
- } else {
- (row.get(ordinal), typ).toString()
- }
- values.add(value)
- i += 1
- }
- TColumn.stringVal(new TStringColumn(values, nulls))
+ asStringTColumn(
+ rows,
+ ordinal,
+ convertFunc = (row, ordinal) => (row.get(ordinal), typ).toString())
}
}
- override def toTColumnValue(ordinal: Int, row: Row, types: Schema):
TColumnValue = {
+ override def toTColumnValue(row: Row, ordinal: Int, types: Schema):
TColumnValue = {
getColumnType(types, ordinal) match {
- case t @ (BOOLEAN_TYPE | BINARY_TYPE | BINARY_TYPE | TINYINT_TYPE |
INT_TYPE |
- BIGINT_TYPE | FLOAT_TYPE | DOUBLE_TYPE | STRING_TYPE) =>
- toTTypeColumnVal(t, row, ordinal)
-
- case _ =>
- val tStrValue = new TStringValue
- if (!isColumnNullAt(row, ordinal)) {
- tStrValue.setValue((row.get(ordinal),
types(ordinal).dataType).toString())
- }
- TColumnValue.stringVal(tStrValue)
+ case BOOLEAN_TYPE => asBooleanTColumnValue(row, ordinal)
+ case BINARY_TYPE => asByteTColumnValue(row, ordinal)
+ case TINYINT_TYPE => asShortTColumnValue(row, ordinal)
+ case INT_TYPE => asIntegerTColumnValue(row, ordinal)
+ case BIGINT_TYPE => asLongTColumnValue(row, ordinal)
+ case FLOAT_TYPE => asFloatTColumnValue(row, ordinal)
+ case DOUBLE_TYPE => asDoubleTColumnValue(row, ordinal)
+ case STRING_TYPE => asStringTColumnValue(row, ordinal)
+ case otherType =>
+ asStringTColumnValue(row, ordinal, rawValue => (rawValue,
otherType).toString())
}
}