This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new aac4503bb [KYUUBI #5870] Directly mapping engine's data type to Java 
type for TRowSet generation
aac4503bb is described below

commit aac4503bba1640320e87f0f6cf721cb80a48af1e
Author: Bowen Liang <[email protected]>
AuthorDate: Thu Dec 21 12:16:49 2023 +0800

    [KYUUBI #5870] Directly mapping engine's data type to Java type for TRowSet 
generation
    
    # :mag: Description
    ## Issue References ๐Ÿ”—
    
    As described.
    
    ## Describe Your Solution ๐Ÿ”ง
    
    Previously, the TRowSetGenerator built a chain of `Engine's data type` to 
`Thrift data type` and then to `Java data type`.
    This limits the possibility of overriding the implementation of Java data 
types handlers and coupling the `Engine's data type` to specific `Thrift data 
type`.
    
    1. This PR directly mapping `Engine's data type` to `Java data type` 
focusing converting the upstream Engine result set's value to real data type.
    2.  it also decoupled the `Thrift data type` to Java data types, eg. 
Thrift's SMALL_INT and TINY_INT can be now ignored, and making the engines data 
type mapped to Java's Short data type.
    3. it's now able to distinguish similar terms of data types with different 
meanings, like Java's `byte` or `byte array` and `tiny int` or `samll int` in 
engines and thrift. Only Java data types are represented in 
`TRowSetColumnGenerator` and `TRowSetColumnValueGenerator`
    
    ## Types of changes :bookmark:
    
    - [ ] Bugfix (non-breaking change which fixes an issue)
    - [ ] New feature (non-breaking change which adds functionality)
    - [ ] Breaking change (fix or feature that would cause existing 
functionality to change)
    
    ## Test Plan ๐Ÿงช
    
    #### Behavior Without This Pull Request :coffin:
    No behaviour changes.
    
    #### Behavior With This Pull Request :tada:
    No behaviour changes.
    
    #### Related Unit Tests
    CI tests.
    
    ---
    
    # Checklists
    ## ๐Ÿ“ Author Self Checklist
    
    - [x] My code follows the [style 
guidelines](https://kyuubi.readthedocs.io/en/master/contributing/code/style.html)
 of this project
    - [x] I have performed a self-review
    - [x] I have commented my code, particularly in hard-to-understand areas
    - [ ] I have made corresponding changes to the documentation
    - [x] My changes generate no new warnings
    - [ ] I have added tests that prove my fix is effective or that my feature 
works
    - [x] New and existing unit tests pass locally with my changes
    - [x] This patch was not authored or co-authored using [Generative 
Tooling](https://www.apache.org/legal/generative-tooling.html)
    
    ## ๐Ÿ“ Committer Pre-Merge Checklist
    
    - [x] Pull request title is okay.
    - [x] No license issues.
    - [x] Milestone correctly set?
    - [x] Test coverage is ok
    - [x] Assignees are selected.
    - [x] Minimum number of approvals
    - [x] No changes are requested
    
    **Be nice. Be informative.**
    
    Closes #5870 from bowenliang123/rowset-map.
    
    Closes #5870
    
    51740a00a [Bowen Liang] update
    3dc48f4b9 [liangbowen] type mapping
    
    Lead-authored-by: Bowen Liang <[email protected]>
    Co-authored-by: liangbowen <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
---
 .../engine/chat/schema/ChatTRowSetGenerator.scala  |  14 +-
 .../flink/schema/FlinkTRowSetGenerator.scala       | 135 ++++++-------
 .../engine/jdbc/dialect/PhoenixDialect.scala       |   6 +-
 .../engine/jdbc/doris/DorisTRowSetGenerator.scala  |  18 +-
 .../engine/jdbc/mysql/MySQLTRowSetGenerator.scala  |  23 +--
 .../PhoenixTRowSetGenerator.scala}                 |  12 +-
 .../postgresql/PostgreSQLTRowSetGenerator.scala    |   2 +-
 .../jdbc/schema/DefaultJdbcTRowSetGenerator.scala  | 115 +++++------
 .../engine/jdbc/schema/JdbcTRowSetGenerator.scala  |   9 +-
 .../spark/schema/SparkArrowTRowSetGenerator.scala  |  10 +-
 .../spark/schema/SparkTRowSetGenerator.scala       |  77 ++++----
 .../trino/schema/TrinoTRowSetGenerator.scala       |  82 +++-----
 .../kyuubi/engine/result/TColumnGenerator.scala    | 105 +++++++++++
 .../engine/result/TColumnValueGenerator.scala      |  97 ++++++++++
 .../kyuubi/engine/result/TRowSetColumnGetter.scala |  13 +-
 .../kyuubi/engine/result/TRowSetGenerator.scala    |  77 ++++++++
 .../engine/schema/AbstractTRowSetGenerator.scala   | 210 ---------------------
 .../scala/org/apache/kyuubi/util/RowSetUtils.scala |   7 -
 .../kyuubi/sql/schema/ServerTRowSetGenerator.scala |  60 +++---
 19 files changed, 506 insertions(+), 566 deletions(-)

diff --git 
a/externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/schema/ChatTRowSetGenerator.scala
 
b/externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/schema/ChatTRowSetGenerator.scala
index 990a19764..7e6a121be 100644
--- 
a/externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/schema/ChatTRowSetGenerator.scala
+++ 
b/externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/schema/ChatTRowSetGenerator.scala
@@ -18,31 +18,29 @@
 package org.apache.kyuubi.engine.chat.schema
 
 import org.apache.kyuubi.engine.chat.schema.ChatTRowSetGenerator._
-import org.apache.kyuubi.engine.schema.AbstractTRowSetGenerator
+import org.apache.kyuubi.engine.result.TRowSetGenerator
 import org.apache.kyuubi.shaded.hive.service.rpc.thrift._
-import org.apache.kyuubi.shaded.hive.service.rpc.thrift.TTypeId._
 
 class ChatTRowSetGenerator
-  extends AbstractTRowSetGenerator[Seq[String], Seq[String], String] {
+  extends TRowSetGenerator[Seq[String], Seq[String], String] {
 
   override def getColumnSizeFromSchemaType(schema: Seq[String]): Int = 
schema.length
 
   override def getColumnType(schema: Seq[String], ordinal: Int): String = 
COL_STRING_TYPE
 
-  override protected def isColumnNullAt(row: Seq[String], ordinal: Int): 
Boolean =
-    row(ordinal) == null
+  override def isColumnNullAt(row: Seq[String], ordinal: Int): Boolean = 
row(ordinal) == null
 
   override def getColumnAs[T](row: Seq[String], ordinal: Int): T = 
row(ordinal).asInstanceOf[T]
 
   override def toTColumn(rows: Seq[Seq[String]], ordinal: Int, typ: String): 
TColumn =
     typ match {
-      case COL_STRING_TYPE => toTTypeColumn(STRING_TYPE, rows, ordinal)
+      case COL_STRING_TYPE => asStringTColumn(rows, ordinal)
       case otherType => throw new UnsupportedOperationException(s"type 
$otherType")
     }
 
-  override def toTColumnValue(ordinal: Int, row: Seq[String], types: 
Seq[String]): TColumnValue =
+  override def toTColumnValue(row: Seq[String], ordinal: Int, types: 
Seq[String]): TColumnValue =
     getColumnType(types, ordinal) match {
-      case "String" => toTTypeColumnVal(STRING_TYPE, row, ordinal)
+      case COL_STRING_TYPE => asStringTColumnValue(row, ordinal)
       case otherType => throw new UnsupportedOperationException(s"type 
$otherType")
     }
 }
diff --git 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/schema/FlinkTRowSetGenerator.scala
 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/schema/FlinkTRowSetGenerator.scala
index b53aab47f..463b66111 100644
--- 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/schema/FlinkTRowSetGenerator.scala
+++ 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/schema/FlinkTRowSetGenerator.scala
@@ -18,21 +18,17 @@ package org.apache.kyuubi.engine.flink.schema
 
 import java.time.{Instant, ZonedDateTime, ZoneId}
 
-import scala.collection.JavaConverters._
-
 import org.apache.flink.table.data.StringData
 import org.apache.flink.table.types.logical._
 import org.apache.flink.types.Row
 
 import org.apache.kyuubi.engine.flink.result.ResultSet
 import org.apache.kyuubi.engine.flink.schema.RowSet.{toHiveString, 
TIMESTAMP_LZT_FORMATTER}
-import org.apache.kyuubi.engine.schema.AbstractTRowSetGenerator
+import org.apache.kyuubi.engine.result.TRowSetGenerator
 import org.apache.kyuubi.shaded.hive.service.rpc.thrift._
-import org.apache.kyuubi.shaded.hive.service.rpc.thrift.TTypeId._
-import org.apache.kyuubi.util.RowSetUtils.bitSetToBuffer
 
 class FlinkTRowSetGenerator(zoneId: ZoneId)
-  extends AbstractTRowSetGenerator[ResultSet, Row, LogicalType] {
+  extends TRowSetGenerator[ResultSet, Row, LogicalType] {
   override def getColumnSizeFromSchemaType(schema: ResultSet): Int = 
schema.columns.size
 
   override def getColumnType(schema: ResultSet, ordinal: Int): LogicalType =
@@ -42,99 +38,74 @@ class FlinkTRowSetGenerator(zoneId: ZoneId)
 
   override def getColumnAs[T](row: Row, ordinal: Int): T = 
row.getFieldAs[T](ordinal)
 
-  override def toTColumnValue(ordinal: Int, row: Row, types: ResultSet): 
TColumnValue = {
+  override def toTColumnValue(row: Row, ordinal: Int, types: ResultSet): 
TColumnValue = {
     getColumnType(types, ordinal) match {
-      case _: BooleanType => toTTypeColumnVal(BOOLEAN_TYPE, row, ordinal)
-      case _: TinyIntType => toTTypeColumnVal(BINARY_TYPE, row, ordinal)
-      case _: SmallIntType => toTTypeColumnVal(TINYINT_TYPE, row, ordinal)
-      case _: IntType => toTTypeColumnVal(INT_TYPE, row, ordinal)
-      case _: BigIntType => toTTypeColumnVal(BIGINT_TYPE, row, ordinal)
-      case _: DoubleType => toTTypeColumnVal(DOUBLE_TYPE, row, ordinal)
-      case _: FloatType => toTTypeColumnVal(FLOAT_TYPE, row, ordinal)
+      case _: BooleanType => asBooleanTColumnValue(row, ordinal)
+      case _: TinyIntType => asByteTColumnValue(row, ordinal)
+      case _: SmallIntType => asShortTColumnValue(row, ordinal)
+      case _: IntType => asIntegerTColumnValue(row, ordinal)
+      case _: BigIntType => asLongTColumnValue(row, ordinal)
+      case _: DoubleType => asDoubleTColumnValue(row, ordinal)
+      case _: FloatType => asFloatTColumnValue(row, ordinal)
       case t @ (_: VarCharType | _: CharType) =>
-        val tStringValue = new TStringValue
-        val fieldValue = row.getField(ordinal)
-        fieldValue match {
-          case value: String =>
-            tStringValue.setValue(value)
-          case value: StringData =>
-            tStringValue.setValue(value.toString)
-          case null =>
-            tStringValue.setValue(null)
-          case other =>
-            throw new IllegalArgumentException(
-              s"Unsupported conversion class ${other.getClass} " +
-                s"for type ${t.getClass}.")
-        }
-        TColumnValue.stringVal(tStringValue)
+        asStringTColumnValue(
+          row,
+          ordinal,
+          convertFunc = {
+            case value: String => value
+            case value: StringData => value.toString
+            case null => null
+            case other => throw new IllegalArgumentException(
+                s"Unsupported conversion class ${other.getClass} for type 
${t.getClass}.")
+          })
       case _: LocalZonedTimestampType =>
-        val tStringValue = new TStringValue
-        val fieldValue = row.getField(ordinal)
-        tStringValue.setValue(TIMESTAMP_LZT_FORMATTER.format(
-          ZonedDateTime.ofInstant(fieldValue.asInstanceOf[Instant], zoneId)))
-        TColumnValue.stringVal(tStringValue)
-      case t =>
-        val tStringValue = new TStringValue
-        if (row.getField(ordinal) != null) {
-          tStringValue.setValue(toHiveString((row.getField(ordinal), t)))
-        }
-        TColumnValue.stringVal(tStringValue)
+        asStringTColumnValue(
+          row,
+          ordinal,
+          rawValue =>
+            TIMESTAMP_LZT_FORMATTER.format(
+              ZonedDateTime.ofInstant(rawValue.asInstanceOf[Instant], zoneId)))
+      case t => asStringTColumnValue(row, ordinal, rawValue => 
toHiveString((rawValue, t)))
     }
   }
 
   override def toTColumn(rows: Seq[Row], ordinal: Int, logicalType: 
LogicalType): TColumn = {
-    val nulls = new java.util.BitSet()
     // for each column, determine the conversion class by sampling the first 
non-value value
     // if there's no row, set the entire column empty
-    val sampleField = rows.iterator.map(_.getField(ordinal)).find(_ ne 
null).orNull
     logicalType match {
-      case _: BooleanType => toTTypeColumn(BOOLEAN_TYPE, rows, ordinal)
-      case _: TinyIntType => toTTypeColumn(BINARY_TYPE, rows, ordinal)
-      case _: SmallIntType => toTTypeColumn(TINYINT_TYPE, rows, ordinal)
-      case _: IntType => toTTypeColumn(INT_TYPE, rows, ordinal)
-      case _: BigIntType => toTTypeColumn(BIGINT_TYPE, rows, ordinal)
-      case _: FloatType => toTTypeColumn(FLOAT_TYPE, rows, ordinal)
-      case _: DoubleType => toTTypeColumn(DOUBLE_TYPE, rows, ordinal)
+      case _: BooleanType => asBooleanTColumn(rows, ordinal)
+      case _: TinyIntType => asByteTColumn(rows, ordinal)
+      case _: SmallIntType => asShortTColumn(rows, ordinal)
+      case _: IntType => asIntegerTColumn(rows, ordinal)
+      case _: BigIntType => asLongTColumn(rows, ordinal)
+      case _: FloatType => asFloatTColumn(rows, ordinal)
+      case _: DoubleType => asDoubleTColumn(rows, ordinal)
       case t @ (_: VarCharType | _: CharType) =>
-        val values: java.util.List[String] = new java.util.ArrayList[String](0)
+        val sampleField = rows.iterator.map(_.getField(ordinal)).find(_ ne 
null).orNull
         sampleField match {
-          case _: String =>
-            values.addAll(getOrSetAsNull[String](rows, ordinal, nulls, ""))
+          case _: String => asStringTColumn(rows, ordinal)
           case _: StringData =>
-            val stringDataValues =
-              getOrSetAsNull[StringData](rows, ordinal, nulls, 
StringData.fromString(""))
-            stringDataValues.forEach(e => values.add(e.toString))
-          case null =>
-            values.addAll(getOrSetAsNull[String](rows, ordinal, nulls, ""))
-          case other =>
-            throw new IllegalArgumentException(
-              s"Unsupported conversion class ${other.getClass} " +
-                s"for type ${t.getClass}.")
+            asStringTColumn(
+              rows,
+              ordinal,
+              convertFunc = (row, ordinal) => getColumnAs[StringData](row, 
ordinal).toString)
+          case null => asStringTColumn(rows, ordinal)
+          case other => throw new IllegalArgumentException(
+              s"Unsupported conversion class ${other.getClass} for type 
${t.getClass}.")
         }
-        TColumn.stringVal(new TStringColumn(values, nulls))
       case _: LocalZonedTimestampType =>
-        val values = getOrSetAsNull[Instant](rows, ordinal, nulls, 
Instant.EPOCH)
-          .toArray().map(v =>
+        asStringTColumn(
+          rows,
+          ordinal,
+          
TIMESTAMP_LZT_FORMATTER.format(ZonedDateTime.ofInstant(Instant.EPOCH, zoneId)),
+          (row, ordinal) =>
             TIMESTAMP_LZT_FORMATTER.format(
-              ZonedDateTime.ofInstant(v.asInstanceOf[Instant], zoneId)))
-        TColumn.stringVal(new TStringColumn(values.toList.asJava, nulls))
+              ZonedDateTime.ofInstant(getColumnAs[Instant](row, ordinal), 
zoneId)))
       case _ =>
-        var i = 0
-        val rowSize = rows.length
-        val values = new java.util.ArrayList[String](rowSize)
-        while (i < rowSize) {
-          val row = rows(i)
-          nulls.set(i, row.getField(ordinal) == null)
-          val value =
-            if (row.getField(ordinal) == null) {
-              ""
-            } else {
-              toHiveString((row.getField(ordinal), logicalType))
-            }
-          values.add(value)
-          i += 1
-        }
-        TColumn.stringVal(new TStringColumn(values, nulls))
+        asStringTColumn(
+          rows,
+          ordinal,
+          convertFunc = (row, ordinal) => toHiveString((row.getField(ordinal), 
logicalType)))
     }
   }
 
diff --git 
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/PhoenixDialect.scala
 
b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/PhoenixDialect.scala
index e12f5d04b..61440ac50 100644
--- 
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/PhoenixDialect.scala
+++ 
b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/PhoenixDialect.scala
@@ -22,8 +22,8 @@ import scala.collection.mutable.ArrayBuffer
 
 import org.apache.commons.lang3.StringUtils
 
-import org.apache.kyuubi.engine.jdbc.phoenix.PhoenixSchemaHelper
-import org.apache.kyuubi.engine.jdbc.schema.{DefaultJdbcTRowSetGenerator, 
JdbcTRowSetGenerator, SchemaHelper}
+import org.apache.kyuubi.engine.jdbc.phoenix.{PhoenixSchemaHelper, 
PhoenixTRowSetGenerator}
+import org.apache.kyuubi.engine.jdbc.schema.{JdbcTRowSetGenerator, 
SchemaHelper}
 import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
 import org.apache.kyuubi.session.Session
 
@@ -100,7 +100,7 @@ class PhoenixDialect extends JdbcDialect {
     query.toString()
   }
 
-  override def getTRowSetGenerator(): JdbcTRowSetGenerator = new 
DefaultJdbcTRowSetGenerator
+  override def getTRowSetGenerator(): JdbcTRowSetGenerator = new 
PhoenixTRowSetGenerator
 
   override def getSchemaHelper(): SchemaHelper = {
     new PhoenixSchemaHelper
diff --git 
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/doris/DorisTRowSetGenerator.scala
 
b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/doris/DorisTRowSetGenerator.scala
index 26c64d81b..b77a7b310 100644
--- 
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/doris/DorisTRowSetGenerator.scala
+++ 
b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/doris/DorisTRowSetGenerator.scala
@@ -16,20 +16,6 @@
  */
 package org.apache.kyuubi.engine.jdbc.doris
 
-import org.apache.kyuubi.engine.jdbc.schema.DefaultJdbcTRowSetGenerator
-import org.apache.kyuubi.shaded.hive.service.rpc.thrift.{TColumn, TColumnValue}
+import org.apache.kyuubi.engine.jdbc.mysql.MySQLTRowSetGenerator
 
-class DorisTRowSetGenerator extends DefaultJdbcTRowSetGenerator {
-
-  override def toTinyIntTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
-    toIntegerTColumn(rows, ordinal)
-
-  override def toSmallIntTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
-    toIntegerTColumn(rows, ordinal)
-
-  override def toTinyIntTColumnValue(row: Seq[_], ordinal: Int): TColumnValue =
-    toIntegerTColumnValue(row, ordinal)
-
-  override def toSmallIntTColumnValue(row: Seq[_], ordinal: Int): TColumnValue 
=
-    toIntegerTColumnValue(row, ordinal)
-}
+class DorisTRowSetGenerator extends MySQLTRowSetGenerator {}
diff --git 
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/mysql/MySQLTRowSetGenerator.scala
 
b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/mysql/MySQLTRowSetGenerator.scala
index 1ed605370..c029131fa 100644
--- 
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/mysql/MySQLTRowSetGenerator.scala
+++ 
b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/mysql/MySQLTRowSetGenerator.scala
@@ -16,6 +16,7 @@
  */
 package org.apache.kyuubi.engine.jdbc.mysql
 
+import java.lang.{Long => JLong}
 import java.sql.Types
 
 import org.apache.kyuubi.engine.jdbc.schema.DefaultJdbcTRowSetGenerator
@@ -24,45 +25,45 @@ import 
org.apache.kyuubi.shaded.hive.service.rpc.thrift.{TColumn, TColumnValue}
 class MySQLTRowSetGenerator extends DefaultJdbcTRowSetGenerator {
 
   override def toTinyIntTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
-    toIntegerTColumn(rows, ordinal)
+    asIntegerTColumn(rows, ordinal)
 
   override def toSmallIntTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
-    toIntegerTColumn(rows, ordinal)
+    asIntegerTColumn(rows, ordinal)
 
   override def toTinyIntTColumnValue(row: Seq[_], ordinal: Int): TColumnValue =
-    toIntegerTColumnValue(row, ordinal)
+    asIntegerTColumnValue(row, ordinal)
 
   override def toSmallIntTColumnValue(row: Seq[_], ordinal: Int): TColumnValue 
=
-    toIntegerTColumnValue(row, ordinal)
+    asIntegerTColumnValue(row, ordinal)
 
   override def toIntegerTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn = {
     val colHead = if (rows.isEmpty) None else rows.head(ordinal)
     colHead match {
       case _: Integer => super.toIntegerTColumn(rows, ordinal)
-      case _: java.lang.Long => super.toBigIntTColumn(rows, ordinal)
+      case _: JLong => super.toBigIntTColumn(rows, ordinal)
       case _ => super.toDefaultTColumn(rows, ordinal, Types.INTEGER)
     }
   }
 
-  override protected def toIntegerTColumnValue(row: Seq[_], ordinal: Int): 
TColumnValue = {
+  override def toIntegerTColumnValue(row: Seq[_], ordinal: Int): TColumnValue 
= {
     row(ordinal) match {
       case _: Integer => super.toIntegerTColumnValue(row, ordinal)
-      case _: java.lang.Long => super.toBigIntTColumnValue(row, ordinal)
+      case _: JLong => super.toBigIntTColumnValue(row, ordinal)
       case _ => super.toDefaultTColumnValue(row, ordinal, Types.INTEGER)
     }
   }
 
-  override protected def toBigIntTColumn(rows: Seq[Seq[_]], ordinal: Int): 
TColumn = {
+  override def toBigIntTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn = {
     val colHead = if (rows.isEmpty) None else rows.head(ordinal)
     colHead match {
-      case v: java.lang.Long => super.toBigIntTColumn(rows, ordinal)
+      case _: JLong => super.toBigIntTColumn(rows, ordinal)
       case _ => super.toDefaultTColumn(rows, ordinal, Types.BIGINT)
     }
   }
 
-  override protected def toBigIntTColumnValue(row: Seq[_], ordinal: Int): 
TColumnValue =
+  override def toBigIntTColumnValue(row: Seq[_], ordinal: Int): TColumnValue =
     row(ordinal) match {
-      case v: java.lang.Long => super.toBigIntTColumnValue(row, ordinal)
+      case _: JLong => super.toBigIntTColumnValue(row, ordinal)
       case _ => super.toDefaultTColumnValue(row, ordinal, Types.BIGINT)
     }
 }
diff --git 
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/postgresql/PostgreSQLTRowSetGenerator.scala
 
b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/phoenix/PhoenixTRowSetGenerator.scala
similarity index 66%
copy from 
externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/postgresql/PostgreSQLTRowSetGenerator.scala
copy to 
externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/phoenix/PhoenixTRowSetGenerator.scala
index 0d02aa732..f8740fce4 100644
--- 
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/postgresql/PostgreSQLTRowSetGenerator.scala
+++ 
b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/phoenix/PhoenixTRowSetGenerator.scala
@@ -14,16 +14,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kyuubi.engine.jdbc.postgresql
+package org.apache.kyuubi.engine.jdbc.phoenix
 
 import org.apache.kyuubi.engine.jdbc.schema.DefaultJdbcTRowSetGenerator
-import org.apache.kyuubi.shaded.hive.service.rpc.thrift.{TColumn, TColumnValue}
 
-class PostgreSQLTRowSetGenerator extends DefaultJdbcTRowSetGenerator {
-
-  override def toSmallIntTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
-    toIntegerTColumn(rows, ordinal)
-
-  override protected def toSmallIntTColumnValue(row: Seq[_], ordinal: Int): 
TColumnValue =
-    toIntegerTColumnValue(row, ordinal)
-}
+class PhoenixTRowSetGenerator extends DefaultJdbcTRowSetGenerator {}
diff --git 
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/postgresql/PostgreSQLTRowSetGenerator.scala
 
b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/postgresql/PostgreSQLTRowSetGenerator.scala
index 0d02aa732..104b3b15d 100644
--- 
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/postgresql/PostgreSQLTRowSetGenerator.scala
+++ 
b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/postgresql/PostgreSQLTRowSetGenerator.scala
@@ -24,6 +24,6 @@ class PostgreSQLTRowSetGenerator extends 
DefaultJdbcTRowSetGenerator {
   override def toSmallIntTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
     toIntegerTColumn(rows, ordinal)
 
-  override protected def toSmallIntTColumnValue(row: Seq[_], ordinal: Int): 
TColumnValue =
+  override def toSmallIntTColumnValue(row: Seq[_], ordinal: Int): TColumnValue 
=
     toIntegerTColumnValue(row, ordinal)
 }
diff --git 
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/schema/DefaultJdbcTRowSetGenerator.scala
 
b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/schema/DefaultJdbcTRowSetGenerator.scala
index a2ad762be..2621379c1 100644
--- 
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/schema/DefaultJdbcTRowSetGenerator.scala
+++ 
b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/schema/DefaultJdbcTRowSetGenerator.scala
@@ -19,11 +19,9 @@ package org.apache.kyuubi.engine.jdbc.schema
 import java.sql.Date
 import java.sql.Types._
 import java.time.LocalDateTime
-import java.util
 
 import org.apache.kyuubi.shaded.hive.service.rpc.thrift._
-import org.apache.kyuubi.shaded.hive.service.rpc.thrift.TTypeId._
-import org.apache.kyuubi.util.RowSetUtils.{bitSetToBuffer, formatDate, 
formatLocalDateTime}
+import org.apache.kyuubi.util.RowSetUtils.{formatDate, formatLocalDateTime}
 
 class DefaultJdbcTRowSetGenerator extends JdbcTRowSetGenerator {
 
@@ -41,7 +39,7 @@ class DefaultJdbcTRowSetGenerator extends 
JdbcTRowSetGenerator {
       case _ => toDefaultTColumn(rows, ordinal, sqlType)
     }
 
-  override def toTColumnValue(ordinal: Int, row: Seq[_], types: Seq[Column]): 
TColumnValue =
+  override def toTColumnValue(row: Seq[_], ordinal: Int, types: Seq[Column]): 
TColumnValue = {
     getColumnType(types, ordinal) match {
       case BIT => toBitTColumnValue(row, ordinal)
       case TINYINT => toTinyIntTColumnValue(row, ordinal)
@@ -54,93 +52,74 @@ class DefaultJdbcTRowSetGenerator extends 
JdbcTRowSetGenerator {
       case VARCHAR => toVarcharTColumnValue(row, ordinal)
       case otherType => toDefaultTColumnValue(row, ordinal, otherType)
     }
-
-  protected def toDefaultTColumn(rows: Seq[Seq[_]], ordinal: Int, sqlType: 
Int): TColumn = {
-    val nulls = new java.util.BitSet()
-    val rowSize = rows.length
-    val values = new util.ArrayList[String](rowSize)
-    var i = 0
-    while (i < rowSize) {
-      val row = rows(i)
-      nulls.set(i, row(ordinal) == null)
-      val value =
-        if (row(ordinal) == null) {
-          ""
-        } else {
-          toHiveString(row(ordinal), sqlType)
-        }
-      values.add(value)
-      i += 1
-    }
-    TColumn.stringVal(new TStringColumn(values, nulls))
   }
 
-  protected def toBitTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
-    toTTypeColumn(BOOLEAN_TYPE, rows, ordinal)
+  def toDefaultTColumn(rows: Seq[Seq[_]], ordinal: Int, sqlType: Int): TColumn 
=
+    asStringTColumn(
+      rows,
+      ordinal,
+      convertFunc = (row, ordinal) => toHiveString(row(ordinal), sqlType))
+
+  def toBitTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
+    asBooleanTColumn(rows, ordinal)
 
-  protected def toTinyIntTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
-    toTTypeColumn(TINYINT_TYPE, rows, ordinal)
+  def toTinyIntTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
+    asShortTColumn(rows, ordinal)
 
-  protected def toSmallIntTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
-    toTTypeColumn(SMALLINT_TYPE, rows, ordinal)
+  def toSmallIntTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
+    asShortTColumn(rows, ordinal)
 
-  protected def toIntegerTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
-    toTTypeColumn(INT_TYPE, rows, ordinal)
+  def toIntegerTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
+    asIntegerTColumn(rows, ordinal)
 
-  protected def toBigIntTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
-    toTTypeColumn(BIGINT_TYPE, rows, ordinal)
+  def toBigIntTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
+    asLongTColumn(rows, ordinal)
 
-  protected def toRealTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
-    toTTypeColumn(FLOAT_TYPE, rows, ordinal)
+  def toRealTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
+    asFloatTColumn(rows, ordinal)
 
-  protected def toDoubleTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
-    toTTypeColumn(DOUBLE_TYPE, rows, ordinal)
+  def toDoubleTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
+    asDoubleTColumn(rows, ordinal)
 
-  protected def toCharTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
-    toTTypeColumn(CHAR_TYPE, rows, ordinal)
+  def toCharTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
+    asStringTColumn(rows, ordinal)
 
-  protected def toVarcharTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
-    toTTypeColumn(STRING_TYPE, rows, ordinal)
+  def toVarcharTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
+    asStringTColumn(rows, ordinal)
 
   // ==========================================================
 
-  protected def toBitTColumnValue(row: Seq[_], ordinal: Int): TColumnValue =
-    toTTypeColumnVal(BOOLEAN_TYPE, row, ordinal)
+  def toBitTColumnValue(row: Seq[_], ordinal: Int): TColumnValue =
+    asBooleanTColumnValue(row, ordinal)
 
-  protected def toTinyIntTColumnValue(row: Seq[_], ordinal: Int): TColumnValue 
=
-    toTTypeColumnVal(TINYINT_TYPE, row, ordinal)
+  def toTinyIntTColumnValue(row: Seq[_], ordinal: Int): TColumnValue =
+    asShortTColumnValue(row, ordinal)
 
-  protected def toSmallIntTColumnValue(row: Seq[_], ordinal: Int): 
TColumnValue =
-    toTTypeColumnVal(SMALLINT_TYPE, row, ordinal)
+  def toSmallIntTColumnValue(row: Seq[_], ordinal: Int): TColumnValue =
+    asShortTColumnValue(row, ordinal)
 
-  protected def toIntegerTColumnValue(row: Seq[_], ordinal: Int): TColumnValue 
=
-    toTTypeColumnVal(INT_TYPE, row, ordinal)
+  def toIntegerTColumnValue(row: Seq[_], ordinal: Int): TColumnValue =
+    asIntegerTColumnValue(row, ordinal)
 
-  protected def toBigIntTColumnValue(row: Seq[_], ordinal: Int): TColumnValue =
-    toTTypeColumnVal(BIGINT_TYPE, row, ordinal)
+  def toBigIntTColumnValue(row: Seq[_], ordinal: Int): TColumnValue =
+    asLongTColumnValue(row, ordinal)
 
-  protected def toRealTColumnValue(row: Seq[_], ordinal: Int): TColumnValue =
-    toTTypeColumnVal(FLOAT_TYPE, row, ordinal)
+  def toRealTColumnValue(row: Seq[_], ordinal: Int): TColumnValue =
+    asFloatTColumnValue(row, ordinal)
 
-  protected def toDoubleTColumnValue(row: Seq[_], ordinal: Int): TColumnValue =
-    toTTypeColumnVal(DOUBLE_TYPE, row, ordinal)
+  def toDoubleTColumnValue(row: Seq[_], ordinal: Int): TColumnValue =
+    asDoubleTColumnValue(row, ordinal)
 
-  protected def toCharTColumnValue(row: Seq[_], ordinal: Int): TColumnValue =
-    toTTypeColumnVal(STRING_TYPE, row, ordinal)
+  def toCharTColumnValue(row: Seq[_], ordinal: Int): TColumnValue =
+    asStringTColumnValue(row, ordinal)
 
-  protected def toVarcharTColumnValue(row: Seq[_], ordinal: Int): TColumnValue 
=
-    toTTypeColumnVal(STRING_TYPE, row, ordinal)
+  def toVarcharTColumnValue(row: Seq[_], ordinal: Int): TColumnValue =
+    asStringTColumnValue(row, ordinal)
 
-  protected def toDefaultTColumnValue(row: Seq[_], ordinal: Int, sqlType: 
Int): TColumnValue = {
-    val tStrValue = new TStringValue
-    if (row(ordinal) != null) {
-      tStrValue.setValue(
-        toHiveString(row(ordinal), sqlType))
-    }
-    TColumnValue.stringVal(tStrValue)
-  }
+  def toDefaultTColumnValue(row: Seq[_], ordinal: Int, sqlType: Int): 
TColumnValue =
+    asStringTColumnValue(row, ordinal, rawValue => toHiveString(rawValue, 
sqlType))
 
-  protected def toHiveString(data: Any, sqlType: Int): String =
+  def toHiveString(data: Any, sqlType: Int): String =
     (data, sqlType) match {
       case (date: Date, DATE) => formatDate(date)
       case (dateTime: LocalDateTime, TIMESTAMP) => 
formatLocalDateTime(dateTime)
diff --git 
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/schema/JdbcTRowSetGenerator.scala
 
b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/schema/JdbcTRowSetGenerator.scala
index 519ac5f79..233a6a799 100644
--- 
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/schema/JdbcTRowSetGenerator.scala
+++ 
b/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/schema/JdbcTRowSetGenerator.scala
@@ -16,13 +16,12 @@
  */
 package org.apache.kyuubi.engine.jdbc.schema
 
-import org.apache.kyuubi.engine.schema.AbstractTRowSetGenerator
+import org.apache.kyuubi.engine.result.TRowSetGenerator
 
-trait JdbcTRowSetGenerator extends AbstractTRowSetGenerator[Seq[Column], 
Seq[_], Int] {
-  override protected def getColumnSizeFromSchemaType(schema: Seq[Column]): Int 
= schema.length
+trait JdbcTRowSetGenerator extends TRowSetGenerator[Seq[Column], Seq[_], Int] {
+  override def getColumnSizeFromSchemaType(schema: Seq[Column]): Int = 
schema.length
 
-  override protected def getColumnType(schema: Seq[Column], ordinal: Int): Int 
=
-    schema(ordinal).sqlType
+  override def getColumnType(schema: Seq[Column], ordinal: Int): Int = 
schema(ordinal).sqlType
 
   override protected def isColumnNullAt(row: Seq[_], ordinal: Int): Boolean = 
row(ordinal) == null
 
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/SparkArrowTRowSetGenerator.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/SparkArrowTRowSetGenerator.scala
index ded022ad0..054df0dd6 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/SparkArrowTRowSetGenerator.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/SparkArrowTRowSetGenerator.scala
@@ -21,12 +21,11 @@ import java.nio.ByteBuffer
 
 import org.apache.spark.sql.types._
 
-import org.apache.kyuubi.engine.schema.AbstractTRowSetGenerator
+import org.apache.kyuubi.engine.result.TRowSetGenerator
 import org.apache.kyuubi.shaded.hive.service.rpc.thrift._
-import org.apache.kyuubi.util.RowSetUtils.bitSetToBuffer
 
 class SparkArrowTRowSetGenerator
-  extends AbstractTRowSetGenerator[StructType, Array[Byte], DataType] {
+  extends TRowSetGenerator[StructType, Array[Byte], DataType] {
   override def toColumnBasedSet(rows: Seq[Array[Byte]], schema: StructType): 
TRowSet = {
     require(schema.length == 1, "ArrowRowSetGenerator accepts only one single 
byte array")
     require(schema.head.dataType == BinaryType, "ArrowRowSetGenerator accepts 
only BinaryType")
@@ -43,8 +42,7 @@ class SparkArrowTRowSetGenerator
       case BinaryType =>
         val values = new java.util.ArrayList[ByteBuffer](1)
         values.add(ByteBuffer.wrap(rows.head))
-        val nulls = new java.util.BitSet()
-        TColumn.binaryVal(new TBinaryColumn(values, nulls))
+        TColumn.binaryVal(new TBinaryColumn(values, 
ByteBuffer.wrap(Array[Byte]())))
       case _ => throw new IllegalArgumentException(
           s"unsupported datatype $typ, ArrowRowSetGenerator accepts only 
BinaryType")
     }
@@ -70,7 +68,7 @@ class SparkArrowTRowSetGenerator
     throw new UnsupportedOperationException
   }
 
-  override def toTColumnValue(ordinal: Int, row: Array[Byte], types: 
StructType): TColumnValue = {
+  override def toTColumnValue(row: Array[Byte], ordinal: Int, types: 
StructType): TColumnValue = {
     throw new UnsupportedOperationException
   }
 
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/SparkTRowSetGenerator.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/SparkTRowSetGenerator.scala
index a35455292..1d1b5ef6a 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/SparkTRowSetGenerator.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/SparkTRowSetGenerator.scala
@@ -19,16 +19,13 @@ package org.apache.kyuubi.engine.spark.schema
 
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.execution.HiveResult
-import org.apache.spark.sql.execution.HiveResult.TimeFormatters
 import org.apache.spark.sql.types._
 
-import org.apache.kyuubi.engine.schema.AbstractTRowSetGenerator
+import org.apache.kyuubi.engine.result.TRowSetGenerator
 import org.apache.kyuubi.shaded.hive.service.rpc.thrift._
-import org.apache.kyuubi.shaded.hive.service.rpc.thrift.TTypeId._
-import org.apache.kyuubi.util.RowSetUtils.bitSetToBuffer
 
 class SparkTRowSetGenerator
-  extends AbstractTRowSetGenerator[StructType, Row, DataType] {
+  extends TRowSetGenerator[StructType, Row, DataType] {
 
   // reused time formatters in single RowSet generation, see KYUUBI-5811
   private val tf = HiveResult.getTimeFormatters
@@ -42,51 +39,43 @@ class SparkTRowSetGenerator
   override def getColumnAs[T](row: Row, ordinal: Int): T = 
row.getAs[T](ordinal)
 
   override def toTColumn(rows: Seq[Row], ordinal: Int, typ: DataType): TColumn 
= {
-    val timeFormatters: TimeFormatters = tf
-    val nulls = new java.util.BitSet()
     typ match {
-      case BooleanType => toTTypeColumn(BOOLEAN_TYPE, rows, ordinal)
-      case ByteType => toTTypeColumn(BINARY_TYPE, rows, ordinal)
-      case ShortType => toTTypeColumn(TINYINT_TYPE, rows, ordinal)
-      case IntegerType => toTTypeColumn(INT_TYPE, rows, ordinal)
-      case LongType => toTTypeColumn(BIGINT_TYPE, rows, ordinal)
-      case FloatType => toTTypeColumn(FLOAT_TYPE, rows, ordinal)
-      case DoubleType => toTTypeColumn(DOUBLE_TYPE, rows, ordinal)
-      case StringType => toTTypeColumn(STRING_TYPE, rows, ordinal)
-      case BinaryType => toTTypeColumn(ARRAY_TYPE, rows, ordinal)
+      case BooleanType => asBooleanTColumn(rows, ordinal)
+      case ByteType => asByteTColumn(rows, ordinal)
+      case ShortType => asShortTColumn(rows, ordinal)
+      case IntegerType => asIntegerTColumn(rows, ordinal)
+      case LongType => asLongTColumn(rows, ordinal)
+      case FloatType => asFloatTColumn(rows, ordinal)
+      case DoubleType => asDoubleTColumn(rows, ordinal)
+      case StringType => asStringTColumn(rows, ordinal)
+      case BinaryType => asByteArrayTColumn(rows, ordinal)
       case _ =>
-        var i = 0
-        val rowSize = rows.length
-        val values = new java.util.ArrayList[String](rowSize)
-        while (i < rowSize) {
-          val row = rows(i)
-          nulls.set(i, row.isNullAt(ordinal))
-          values.add(RowSet.toHiveString(row.get(ordinal) -> typ, 
timeFormatters = timeFormatters))
-          i += 1
-        }
-        TColumn.stringVal(new TStringColumn(values, nulls))
+        val timeFormatters = tf
+        asStringTColumn(
+          rows,
+          ordinal,
+          "NULL",
+          (row, ordinal) =>
+            RowSet.toHiveString(
+              getColumnAs[Any](row, ordinal) -> typ,
+              timeFormatters = timeFormatters))
     }
   }
 
-  override def toTColumnValue(ordinal: Int, row: Row, types: StructType): 
TColumnValue = {
-    val timeFormatters: TimeFormatters = tf
+  override def toTColumnValue(row: Row, ordinal: Int, types: StructType): 
TColumnValue = {
     getColumnType(types, ordinal) match {
-      case BooleanType => toTTypeColumnVal(BOOLEAN_TYPE, row, ordinal)
-      case ByteType => toTTypeColumnVal(BINARY_TYPE, row, ordinal)
-      case ShortType => toTTypeColumnVal(TINYINT_TYPE, row, ordinal)
-      case IntegerType => toTTypeColumnVal(INT_TYPE, row, ordinal)
-      case LongType => toTTypeColumnVal(BIGINT_TYPE, row, ordinal)
-      case FloatType => toTTypeColumnVal(FLOAT_TYPE, row, ordinal)
-      case DoubleType => toTTypeColumnVal(DOUBLE_TYPE, row, ordinal)
-      case StringType => toTTypeColumnVal(STRING_TYPE, row, ordinal)
-      case _ =>
-        val tStrValue = new TStringValue
-        if (!row.isNullAt(ordinal)) {
-          tStrValue.setValue(RowSet.toHiveString(
-            row.get(ordinal) -> types(ordinal).dataType,
-            timeFormatters = timeFormatters))
-        }
-        TColumnValue.stringVal(tStrValue)
+      case BooleanType => asBooleanTColumnValue(row, ordinal)
+      case ByteType => asByteTColumnValue(row, ordinal)
+      case ShortType => asShortTColumnValue(row, ordinal)
+      case IntegerType => asIntegerTColumnValue(row, ordinal)
+      case LongType => asLongTColumnValue(row, ordinal)
+      case FloatType => asFloatTColumnValue(row, ordinal)
+      case DoubleType => asDoubleTColumnValue(row, ordinal)
+      case StringType => asStringTColumnValue(row, ordinal)
+      case _ => asStringTColumnValue(
+          row,
+          ordinal,
+          rawValue => RowSet.toHiveString(rawValue -> types(ordinal).dataType, 
timeFormatters = tf))
     }
   }
 
diff --git 
a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/schema/TrinoTRowSetGenerator.scala
 
b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/schema/TrinoTRowSetGenerator.scala
index 9c323a508..57d91b371 100644
--- 
a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/schema/TrinoTRowSetGenerator.scala
+++ 
b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/schema/TrinoTRowSetGenerator.scala
@@ -20,76 +20,56 @@ package org.apache.kyuubi.engine.trino.schema
 import io.trino.client.{ClientTypeSignature, Column}
 import io.trino.client.ClientStandardTypes._
 
-import org.apache.kyuubi.engine.schema.AbstractTRowSetGenerator
+import org.apache.kyuubi.engine.result.TRowSetGenerator
 import org.apache.kyuubi.engine.trino.schema.RowSet.toHiveString
 import org.apache.kyuubi.shaded.hive.service.rpc.thrift._
-import org.apache.kyuubi.shaded.hive.service.rpc.thrift.TTypeId._
-import org.apache.kyuubi.util.RowSetUtils._
 
 class TrinoTRowSetGenerator
-  extends AbstractTRowSetGenerator[Seq[Column], Seq[_], ClientTypeSignature] {
+  extends TRowSetGenerator[Seq[Column], Seq[_], ClientTypeSignature] {
 
   override def getColumnSizeFromSchemaType(schema: Seq[Column]): Int = 
schema.length
 
-  override def getColumnType(schema: Seq[Column], ordinal: Int): 
ClientTypeSignature = {
+  override def getColumnType(schema: Seq[Column], ordinal: Int): 
ClientTypeSignature =
     schema(ordinal).getTypeSignature
-  }
 
-  override def isColumnNullAt(row: Seq[_], ordinal: Int): Boolean =
-    row(ordinal) == null
+  override def isColumnNullAt(row: Seq[_], ordinal: Int): Boolean = 
row(ordinal) == null
 
-  override def getColumnAs[T](row: Seq[_], ordinal: Int): T =
-    row(ordinal).asInstanceOf[T]
+  override def getColumnAs[T](row: Seq[_], ordinal: Int): T = 
row(ordinal).asInstanceOf[T]
 
   override def toTColumn(rows: Seq[Seq[_]], ordinal: Int, typ: 
ClientTypeSignature): TColumn = {
-    val nulls = new java.util.BitSet()
     typ.getRawType match {
-      case BOOLEAN => toTTypeColumn(BOOLEAN_TYPE, rows, ordinal)
-      case TINYINT => toTTypeColumn(BINARY_TYPE, rows, ordinal)
-      case SMALLINT => toTTypeColumn(TINYINT_TYPE, rows, ordinal)
-      case INTEGER => toTTypeColumn(INT_TYPE, rows, ordinal)
-      case BIGINT => toTTypeColumn(BIGINT_TYPE, rows, ordinal)
-      case REAL => toTTypeColumn(FLOAT_TYPE, rows, ordinal)
-      case DOUBLE => toTTypeColumn(DOUBLE_TYPE, rows, ordinal)
-      case VARCHAR => toTTypeColumn(STRING_TYPE, rows, ordinal)
-      case VARBINARY => toTTypeColumn(ARRAY_TYPE, rows, ordinal)
+      case BOOLEAN => asBooleanTColumn(rows, ordinal)
+      case TINYINT => asByteTColumn(rows, ordinal)
+      case SMALLINT => asShortTColumn(rows, ordinal)
+      case INTEGER => asIntegerTColumn(rows, ordinal)
+      case BIGINT => asLongTColumn(rows, ordinal)
+      case REAL => asFloatTColumn(rows, ordinal)
+      case DOUBLE => asDoubleTColumn(rows, ordinal)
+      case VARCHAR => asStringTColumn(rows, ordinal)
+      case VARBINARY => asByteArrayTColumn(rows, ordinal)
       case _ =>
-        val rowSize = rows.length
-        val values = new java.util.ArrayList[String](rowSize)
-        var i = 0
-        while (i < rowSize) {
-          val row = rows(i)
-          val isNull = isColumnNullAt(row, ordinal)
-          nulls.set(i, isNull)
-          val value = if (isNull) {
-            ""
-          } else {
-            toHiveString(row(ordinal), typ)
-          }
-          values.add(value)
-          i += 1
-        }
-        TColumn.stringVal(new TStringColumn(values, nulls))
+        asStringTColumn(
+          rows,
+          ordinal,
+          convertFunc = (row, ordinal) => toHiveString(getColumnAs[Any](row, 
ordinal), typ))
     }
   }
 
-  override def toTColumnValue(ordinal: Int, row: Seq[_], types: Seq[Column]): 
TColumnValue = {
+  override def toTColumnValue(row: Seq[_], ordinal: Int, types: Seq[Column]): 
TColumnValue = {
     getColumnType(types, ordinal).getRawType match {
-      case BOOLEAN => toTTypeColumnVal(BOOLEAN_TYPE, row, ordinal)
-      case TINYINT => toTTypeColumnVal(BINARY_TYPE, row, ordinal)
-      case SMALLINT => toTTypeColumnVal(TINYINT_TYPE, row, ordinal)
-      case INTEGER => toTTypeColumnVal(INT_TYPE, row, ordinal)
-      case BIGINT => toTTypeColumnVal(BIGINT_TYPE, row, ordinal)
-      case REAL => toTTypeColumnVal(FLOAT_TYPE, row, ordinal)
-      case DOUBLE => toTTypeColumnVal(DOUBLE_TYPE, row, ordinal)
-      case VARCHAR => toTTypeColumnVal(STRING_TYPE, row, ordinal)
+      case BOOLEAN => asBooleanTColumnValue(row, ordinal)
+      case TINYINT => asByteTColumnValue(row, ordinal)
+      case SMALLINT => asShortTColumnValue(row, ordinal)
+      case INTEGER => asIntegerTColumnValue(row, ordinal)
+      case BIGINT => asLongTColumnValue(row, ordinal)
+      case REAL => asFloatTColumnValue(row, ordinal)
+      case DOUBLE => asDoubleTColumnValue(row, ordinal)
+      case VARCHAR => asStringTColumnValue(row, ordinal)
       case _ =>
-        val tStrValue = new TStringValue
-        if (row(ordinal) != null) {
-          tStrValue.setValue(
-            toHiveString(row(ordinal), types(ordinal).getTypeSignature))
-        }
-        TColumnValue.stringVal(tStrValue)
+        asStringTColumnValue(
+          row,
+          ordinal,
+          rawValue => toHiveString(rawValue, types(ordinal).getTypeSignature))
     }
   }
 
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/result/TColumnGenerator.scala
 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/result/TColumnGenerator.scala
new file mode 100644
index 000000000..e2c8f1ea6
--- /dev/null
+++ 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/result/TColumnGenerator.scala
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.result
+import java.lang.{Boolean => JBoolean, Byte => JByte, Double => JDouble, Float 
=> JFloat, Long => JLong, Short => JShort}
+import java.nio.ByteBuffer
+import java.util.{ArrayList => JArrayList, BitSet => JBitSet, List => JList}
+
+import scala.collection.JavaConverters._
+
+import org.apache.kyuubi.shaded.hive.service.rpc.thrift._
+
+trait TColumnGenerator[RowT] extends TRowSetColumnGetter[RowT] {
+  protected def getColumnToList[T](
+      rows: Seq[RowT],
+      ordinal: Int,
+      defaultVal: T,
+      convertFunc: (RowT, Int) => T = null): (JList[T], ByteBuffer) = {
+    val rowSize = rows.length
+    val ret = new JArrayList[T](rowSize)
+    val nulls = new JBitSet()
+    var idx = 0
+    while (idx < rowSize) {
+      val row = rows(idx)
+      val isNull = isColumnNullAt(row, ordinal)
+      if (isNull) {
+        nulls.set(idx, true)
+        ret.add(defaultVal)
+      } else {
+        val value = Option(convertFunc) match {
+          case Some(f) => f(row, ordinal)
+          case _ => getColumnAs[T](row, ordinal)
+        }
+        ret.add(value)
+      }
+      idx += 1
+    }
+    (ret, ByteBuffer.wrap(nulls.toByteArray))
+  }
+
+  def asBooleanTColumn(rows: Seq[RowT], ordinal: Int): TColumn = {
+    val (values, nulls) = getColumnToList[JBoolean](rows, ordinal, true)
+    TColumn.boolVal(new TBoolColumn(values, nulls))
+  }
+
+  def asByteTColumn(rows: Seq[RowT], ordinal: Int): TColumn = {
+    val (values, nulls) = getColumnToList[JByte](rows, ordinal, 0.toByte)
+    TColumn.byteVal(new TByteColumn(values, nulls))
+  }
+
+  def asShortTColumn(rows: Seq[RowT], ordinal: Int): TColumn = {
+    val (values, nulls) = getColumnToList[JShort](rows, ordinal, 0.toShort)
+    TColumn.i16Val(new TI16Column(values, nulls))
+  }
+
+  def asIntegerTColumn(rows: Seq[RowT], ordinal: Int): TColumn = {
+    val (values, nulls) = getColumnToList[Integer](rows, ordinal, 0)
+    TColumn.i32Val(new TI32Column(values, nulls))
+  }
+
+  def asLongTColumn(rows: Seq[RowT], ordinal: Int): TColumn = {
+    val (values, nulls) = getColumnToList[JLong](rows, ordinal, 0.toLong)
+    TColumn.i64Val(new TI64Column(values, nulls))
+  }
+
+  def asFloatTColumn(rows: Seq[RowT], ordinal: Int): TColumn = {
+    val (values, nulls) = getColumnToList[JFloat](rows, ordinal, 0.toFloat)
+    val doubleValues = values.asScala.map(f => 
JDouble.valueOf(f.toString)).asJava
+    TColumn.doubleVal(new TDoubleColumn(doubleValues, nulls))
+  }
+
+  def asDoubleTColumn(rows: Seq[RowT], ordinal: Int): TColumn = {
+    val (values, nulls) = getColumnToList[JDouble](rows, ordinal, 0.toDouble)
+    TColumn.doubleVal(new TDoubleColumn(values, nulls))
+  }
+
+  def asStringTColumn(
+      rows: Seq[RowT],
+      ordinal: Int,
+      defaultVal: String = "",
+      convertFunc: (RowT, Int) => String = null): TColumn = {
+    val (values, nulls) = getColumnToList[String](rows, ordinal, defaultVal, 
convertFunc)
+    TColumn.stringVal(new TStringColumn(values, nulls))
+  }
+
+  def asByteArrayTColumn(rows: Seq[RowT], ordinal: Int): TColumn = {
+    val (values, nulls) = getColumnToList[Array[Byte]](rows, ordinal, 
defaultVal = Array[Byte]())
+    val byteBufferValues = values.asScala.map(ByteBuffer.wrap).asJava
+    TColumn.binaryVal(new TBinaryColumn(byteBufferValues, nulls))
+  }
+}
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/result/TColumnValueGenerator.scala
 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/result/TColumnValueGenerator.scala
new file mode 100644
index 000000000..0ff3a250d
--- /dev/null
+++ 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/result/TColumnValueGenerator.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.result
+
+import java.lang.{Boolean => JBoolean, Byte => JByte, Double => JDouble, Float 
=> JFloat, Long => JLong, Short => JShort}
+
+import org.apache.kyuubi.shaded.hive.service.rpc.thrift._
+
+trait TColumnValueGenerator[RowT] extends TRowSetColumnGetter[RowT] {
+
+  def asBooleanTColumnValue(row: RowT, ordinal: Int): TColumnValue = {
+    val tValue = new TBoolValue
+    if (!isColumnNullAt(row, ordinal)) {
+      tValue.setValue(getColumnAs[JBoolean](row, ordinal))
+    }
+    TColumnValue.boolVal(tValue)
+  }
+
+  def asByteTColumnValue(row: RowT, ordinal: Int): TColumnValue = {
+    val tValue = new TByteValue
+    if (!isColumnNullAt(row, ordinal)) {
+      tValue.setValue(getColumnAs[JByte](row, ordinal))
+    }
+    TColumnValue.byteVal(tValue)
+  }
+
+  def asShortTColumnValue(row: RowT, ordinal: Int): TColumnValue = {
+    val tValue = new TI16Value
+    if (!isColumnNullAt(row, ordinal)) {
+      tValue.setValue(getColumnAs[JShort](row, ordinal))
+    }
+    TColumnValue.i16Val(tValue)
+  }
+
+  def asIntegerTColumnValue(row: RowT, ordinal: Int): TColumnValue = {
+    val tValue = new TI32Value
+    if (!isColumnNullAt(row, ordinal)) {
+      tValue.setValue(getColumnAs[Integer](row, ordinal))
+    }
+    TColumnValue.i32Val(tValue)
+  }
+
+  def asLongTColumnValue(row: RowT, ordinal: Int): TColumnValue = {
+    val tValue = new TI64Value
+    if (!isColumnNullAt(row, ordinal)) {
+      tValue.setValue(getColumnAs[JLong](row, ordinal))
+    }
+    TColumnValue.i64Val(tValue)
+  }
+
+  def asFloatTColumnValue(row: RowT, ordinal: Int): TColumnValue = {
+    val tValue = new TDoubleValue
+    if (!isColumnNullAt(row, ordinal)) {
+      tValue.setValue(getColumnAs[JFloat](row, ordinal).toDouble)
+    }
+    TColumnValue.doubleVal(tValue)
+  }
+
+  def asDoubleTColumnValue(row: RowT, ordinal: Int): TColumnValue = {
+    val tValue = new TDoubleValue
+    if (!isColumnNullAt(row, ordinal)) {
+      tValue.setValue(getColumnAs[JDouble](row, ordinal))
+    }
+    TColumnValue.doubleVal(tValue)
+  }
+
+  def asStringTColumnValue(
+      row: RowT,
+      ordinal: Int,
+      convertFunc: Any => String = null): TColumnValue = {
+    val tValue = new TStringValue
+    if (!isColumnNullAt(row, ordinal)) {
+      val str = getColumnAs[Any](row, ordinal) match {
+        case strObj: String => strObj
+        case obj if convertFunc != null => convertFunc(obj)
+        case anyObj => String.valueOf(anyObj)
+      }
+      tValue.setValue(str)
+    }
+    TColumnValue.stringVal(tValue)
+  }
+}
diff --git 
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/postgresql/PostgreSQLTRowSetGenerator.scala
 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/result/TRowSetColumnGetter.scala
similarity index 60%
copy from 
externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/postgresql/PostgreSQLTRowSetGenerator.scala
copy to 
kyuubi-common/src/main/scala/org/apache/kyuubi/engine/result/TRowSetColumnGetter.scala
index 0d02aa732..3f6b6a16a 100644
--- 
a/externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/postgresql/PostgreSQLTRowSetGenerator.scala
+++ 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/result/TRowSetColumnGetter.scala
@@ -14,16 +14,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kyuubi.engine.jdbc.postgresql
 
-import org.apache.kyuubi.engine.jdbc.schema.DefaultJdbcTRowSetGenerator
-import org.apache.kyuubi.shaded.hive.service.rpc.thrift.{TColumn, TColumnValue}
+package org.apache.kyuubi.engine.result
 
-class PostgreSQLTRowSetGenerator extends DefaultJdbcTRowSetGenerator {
+trait TRowSetColumnGetter[RowT] {
+  protected def isColumnNullAt(row: RowT, ordinal: Int): Boolean
 
-  override def toSmallIntTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
-    toIntegerTColumn(rows, ordinal)
-
-  override protected def toSmallIntTColumnValue(row: Seq[_], ordinal: Int): 
TColumnValue =
-    toIntegerTColumnValue(row, ordinal)
+  protected def getColumnAs[T](row: RowT, ordinal: Int): T
 }
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/result/TRowSetGenerator.scala
 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/result/TRowSetGenerator.scala
new file mode 100644
index 000000000..096e45ad8
--- /dev/null
+++ 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/result/TRowSetGenerator.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.result
+import java.util.{ArrayList => JArrayList}
+
+import org.apache.kyuubi.shaded.hive.service.rpc.thrift._
+
+trait TRowSetGenerator[SchemaT, RowT, ColumnT]
+  extends TColumnValueGenerator[RowT] with TColumnGenerator[RowT] {
+
+  def getColumnSizeFromSchemaType(schema: SchemaT): Int
+
+  def getColumnType(schema: SchemaT, ordinal: Int): ColumnT
+
+  def toTColumn(rows: Seq[RowT], ordinal: Int, typ: ColumnT): TColumn
+
+  def toTColumnValue(row: RowT, ordinal: Int, types: SchemaT): TColumnValue
+
+  def toTRowSet(rows: Seq[RowT], schema: SchemaT, protocolVersion: 
TProtocolVersion): TRowSet = {
+    if (protocolVersion.getValue < 
TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V6.getValue) {
+      toRowBasedSet(rows, schema)
+    } else {
+      toColumnBasedSet(rows, schema)
+    }
+  }
+
+  def toRowBasedSet(rows: Seq[RowT], schema: SchemaT): TRowSet = {
+    val rowSize = rows.length
+    val tRows = new JArrayList[TRow](rowSize)
+    var i = 0
+    while (i < rowSize) {
+      val row = rows(i)
+      var j = 0
+      val columnSize = getColumnSizeFromSchemaType(schema)
+      val tColumnValues = new JArrayList[TColumnValue](columnSize)
+      while (j < columnSize) {
+        val columnValue = toTColumnValue(row, j, schema)
+        tColumnValues.add(columnValue)
+        j += 1
+      }
+      i += 1
+      val tRow = new TRow(tColumnValues)
+      tRows.add(tRow)
+    }
+    new TRowSet(0, tRows)
+  }
+
+  def toColumnBasedSet(rows: Seq[RowT], schema: SchemaT): TRowSet = {
+    val rowSize = rows.length
+    val tRowSet = new TRowSet(0, new JArrayList[TRow](rowSize))
+    var i = 0
+    val columnSize = getColumnSizeFromSchemaType(schema)
+    val tColumns = new JArrayList[TColumn](columnSize)
+    while (i < columnSize) {
+      val tColumn = toTColumn(rows, i, getColumnType(schema, i))
+      tColumns.add(tColumn)
+      i += 1
+    }
+    tRowSet.setColumns(tColumns)
+    tRowSet
+  }
+}
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/schema/AbstractTRowSetGenerator.scala
 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/schema/AbstractTRowSetGenerator.scala
deleted file mode 100644
index 3433bc2b0..000000000
--- 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/schema/AbstractTRowSetGenerator.scala
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kyuubi.engine.schema
-import java.nio.ByteBuffer
-import java.util.{ArrayList => JArrayList, BitSet => JBitSet, List => JList}
-
-import scala.collection.JavaConverters._
-
-import org.apache.kyuubi.shaded.hive.service.rpc.thrift._
-import org.apache.kyuubi.shaded.hive.service.rpc.thrift.TTypeId._
-import org.apache.kyuubi.util.RowSetUtils.bitSetToBuffer
-
-trait AbstractTRowSetGenerator[SchemaT, RowT, ColumnT] {
-
-  protected def getColumnSizeFromSchemaType(schema: SchemaT): Int
-
-  protected def getColumnType(schema: SchemaT, ordinal: Int): ColumnT
-
-  protected def isColumnNullAt(row: RowT, ordinal: Int): Boolean
-
-  protected def getColumnAs[T](row: RowT, ordinal: Int): T
-
-  protected def toTColumn(rows: Seq[RowT], ordinal: Int, typ: ColumnT): TColumn
-
-  protected def toTColumnValue(ordinal: Int, row: RowT, types: SchemaT): 
TColumnValue
-
-  def toTRowSet(
-      rows: Seq[RowT],
-      schema: SchemaT,
-      protocolVersion: TProtocolVersion): TRowSet = {
-    if (protocolVersion.getValue < 
TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V6.getValue) {
-      toRowBasedSet(rows, schema)
-    } else {
-      toColumnBasedSet(rows, schema)
-    }
-  }
-
-  def toRowBasedSet(rows: Seq[RowT], schema: SchemaT): TRowSet = {
-    val rowSize = rows.length
-    val tRows = new JArrayList[TRow](rowSize)
-    var i = 0
-    while (i < rowSize) {
-      val row = rows(i)
-      var j = 0
-      val columnSize = getColumnSizeFromSchemaType(schema)
-      val tColumnValues = new JArrayList[TColumnValue](columnSize)
-      while (j < columnSize) {
-        val columnValue = toTColumnValue(j, row, schema)
-        tColumnValues.add(columnValue)
-        j += 1
-      }
-      i += 1
-      val tRow = new TRow(tColumnValues)
-      tRows.add(tRow)
-    }
-    new TRowSet(0, tRows)
-  }
-
-  def toColumnBasedSet(rows: Seq[RowT], schema: SchemaT): TRowSet = {
-    val rowSize = rows.length
-    val tRowSet = new TRowSet(0, new JArrayList[TRow](rowSize))
-    var i = 0
-    val columnSize = getColumnSizeFromSchemaType(schema)
-    val tColumns = new JArrayList[TColumn](columnSize)
-    while (i < columnSize) {
-      val tColumn = toTColumn(rows, i, getColumnType(schema, i))
-      tColumns.add(tColumn)
-      i += 1
-    }
-    tRowSet.setColumns(tColumns)
-    tRowSet
-  }
-
-  protected def getOrSetAsNull[T](
-      rows: Seq[RowT],
-      ordinal: Int,
-      nulls: JBitSet,
-      defaultVal: T): JList[T] = {
-    val size = rows.length
-    val ret = new JArrayList[T](size)
-    var idx = 0
-    while (idx < size) {
-      val row = rows(idx)
-      val isNull = isColumnNullAt(row, ordinal)
-      if (isNull) {
-        nulls.set(idx, true)
-        ret.add(defaultVal)
-      } else {
-        ret.add(getColumnAs[T](row, ordinal))
-      }
-      idx += 1
-    }
-    ret
-  }
-
-  protected def toTTypeColumnVal(typeId: TTypeId, row: RowT, ordinal: Int): 
TColumnValue = {
-    def isNull = isColumnNullAt(row, ordinal)
-    typeId match {
-      case BOOLEAN_TYPE =>
-        val boolValue = new TBoolValue
-        if (!isNull) boolValue.setValue(getColumnAs[java.lang.Boolean](row, 
ordinal))
-        TColumnValue.boolVal(boolValue)
-
-      case BINARY_TYPE =>
-        val byteValue = new TByteValue
-        if (!isNull) byteValue.setValue(getColumnAs[java.lang.Byte](row, 
ordinal))
-        TColumnValue.byteVal(byteValue)
-
-      case SMALLINT_TYPE | TINYINT_TYPE =>
-        val tI16Value = new TI16Value
-        if (!isNull) tI16Value.setValue(getColumnAs[java.lang.Short](row, 
ordinal))
-        TColumnValue.i16Val(tI16Value)
-
-      case INT_TYPE =>
-        val tI32Value = new TI32Value
-        if (!isNull) tI32Value.setValue(getColumnAs[java.lang.Integer](row, 
ordinal))
-        TColumnValue.i32Val(tI32Value)
-
-      case BIGINT_TYPE =>
-        val tI64Value = new TI64Value
-        if (!isNull) tI64Value.setValue(getColumnAs[java.lang.Long](row, 
ordinal))
-        TColumnValue.i64Val(tI64Value)
-
-      case FLOAT_TYPE =>
-        val tDoubleValue = new TDoubleValue
-        if (!isNull) tDoubleValue.setValue(getColumnAs[java.lang.Float](row, 
ordinal).toDouble)
-        TColumnValue.doubleVal(tDoubleValue)
-
-      case DOUBLE_TYPE =>
-        val tDoubleValue = new TDoubleValue
-        if (!isNull) tDoubleValue.setValue(getColumnAs[java.lang.Double](row, 
ordinal))
-        TColumnValue.doubleVal(tDoubleValue)
-
-      case STRING_TYPE =>
-        val tStringValue = new TStringValue
-        if (!isNull) tStringValue.setValue(getColumnAs[String](row, ordinal))
-        TColumnValue.stringVal(tStringValue)
-
-      case otherType =>
-        throw new UnsupportedOperationException(s"unsupported type $otherType 
for toTTypeColumnVal")
-    }
-  }
-
-  protected def toTTypeColumn(typeId: TTypeId, rows: Seq[RowT], ordinal: Int): 
TColumn = {
-    val nulls = new JBitSet()
-    typeId match {
-      case BOOLEAN_TYPE =>
-        val values = getOrSetAsNull[java.lang.Boolean](rows, ordinal, nulls, 
true)
-        TColumn.boolVal(new TBoolColumn(values, nulls))
-
-      case BINARY_TYPE =>
-        val values = getOrSetAsNull[java.lang.Byte](rows, ordinal, nulls, 
0.toByte)
-        TColumn.byteVal(new TByteColumn(values, nulls))
-
-      case SMALLINT_TYPE | TINYINT_TYPE =>
-        val values = getOrSetAsNull[java.lang.Short](rows, ordinal, nulls, 
0.toShort)
-        TColumn.i16Val(new TI16Column(values, nulls))
-
-      case INT_TYPE =>
-        val values = getOrSetAsNull[java.lang.Integer](rows, ordinal, nulls, 0)
-        TColumn.i32Val(new TI32Column(values, nulls))
-
-      case BIGINT_TYPE =>
-        val values = getOrSetAsNull[java.lang.Long](rows, ordinal, nulls, 0L)
-        TColumn.i64Val(new TI64Column(values, nulls))
-
-      case FLOAT_TYPE =>
-        val values = getOrSetAsNull[java.lang.Float](rows, ordinal, nulls, 
0.toFloat)
-          .asScala.map(n => java.lang.Double.valueOf(n.toString)).asJava
-        TColumn.doubleVal(new TDoubleColumn(values, nulls))
-
-      case DOUBLE_TYPE =>
-        val values = getOrSetAsNull[java.lang.Double](rows, ordinal, nulls, 
0.toDouble)
-        TColumn.doubleVal(new TDoubleColumn(values, nulls))
-
-      case STRING_TYPE =>
-        val values = getOrSetAsNull[java.lang.String](rows, ordinal, nulls, "")
-        TColumn.stringVal(new TStringColumn(values, nulls))
-
-      case CHAR_TYPE =>
-        val values = getOrSetAsNull[java.lang.String](rows, ordinal, nulls, "")
-        TColumn.stringVal(new TStringColumn(values, nulls))
-
-      case ARRAY_TYPE =>
-        val values = getOrSetAsNull[Array[Byte]](rows, ordinal, nulls, Array())
-          .asScala
-          .map(ByteBuffer.wrap)
-          .asJava
-        TColumn.binaryVal(new TBinaryColumn(values, nulls))
-
-      case otherType =>
-        throw new UnsupportedOperationException(s"unsupported type $otherType 
for toTTypeColumnVal")
-    }
-  }
-}
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/RowSetUtils.scala 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/RowSetUtils.scala
index f320fd902..c79c20327 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/RowSetUtils.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/RowSetUtils.scala
@@ -17,15 +17,12 @@
 
 package org.apache.kyuubi.util
 
-import java.nio.ByteBuffer
 import java.time.{Instant, LocalDate, LocalDateTime, LocalTime, ZoneId}
 import java.time.chrono.IsoChronology
 import java.time.format.DateTimeFormatterBuilder
 import java.time.temporal.ChronoField
 import java.util.{Date, Locale}
 
-import scala.language.implicitConversions
-
 import org.apache.commons.lang3.time.FastDateFormat
 
 private[kyuubi] object RowSetUtils {
@@ -77,8 +74,4 @@ private[kyuubi] object RowSetUtils {
     timeZone.map(timestampFormatter.withZone(_).format(i))
       .getOrElse(timestampFormatter.format(i))
   }
-
-  implicit def bitSetToBuffer(bitSet: java.util.BitSet): ByteBuffer = {
-    ByteBuffer.wrap(bitSet.toByteArray)
-  }
 }
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/sql/schema/ServerTRowSetGenerator.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/sql/schema/ServerTRowSetGenerator.scala
index e1a9d55a6..96294c6eb 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/sql/schema/ServerTRowSetGenerator.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/sql/schema/ServerTRowSetGenerator.scala
@@ -17,13 +17,12 @@
 
 package org.apache.kyuubi.sql.schema
 
-import org.apache.kyuubi.engine.schema.AbstractTRowSetGenerator
+import org.apache.kyuubi.engine.result.TRowSetGenerator
 import org.apache.kyuubi.shaded.hive.service.rpc.thrift._
 import org.apache.kyuubi.shaded.hive.service.rpc.thrift.TTypeId._
-import org.apache.kyuubi.util.RowSetUtils._
 
 class ServerTRowSetGenerator
-  extends AbstractTRowSetGenerator[Schema, Row, TTypeId] {
+  extends TRowSetGenerator[Schema, Row, TTypeId] {
 
   override def getColumnSizeFromSchemaType(schema: Schema): Int = schema.length
 
@@ -34,44 +33,35 @@ class ServerTRowSetGenerator
   override def getColumnAs[T](row: Row, ordinal: Int): T = 
row.getAs[T](ordinal)
 
   override def toTColumn(rows: Seq[Row], ordinal: Int, typ: TTypeId): TColumn 
= {
-    val nulls = new java.util.BitSet()
     typ match {
-      case t @ (BOOLEAN_TYPE | BINARY_TYPE | BINARY_TYPE | TINYINT_TYPE | 
INT_TYPE |
-          BIGINT_TYPE | FLOAT_TYPE | DOUBLE_TYPE | STRING_TYPE) =>
-        toTTypeColumn(t, rows, ordinal)
-
+      case BOOLEAN_TYPE => asBooleanTColumn(rows, ordinal)
+      case BINARY_TYPE => asShortTColumn(rows, ordinal)
+      case TINYINT_TYPE => asShortTColumn(rows, ordinal)
+      case INT_TYPE => asIntegerTColumn(rows, ordinal)
+      case BIGINT_TYPE => asLongTColumn(rows, ordinal)
+      case FLOAT_TYPE => asFloatTColumn(rows, ordinal)
+      case DOUBLE_TYPE => asDoubleTColumn(rows, ordinal)
+      case STRING_TYPE => asStringTColumn(rows, ordinal)
       case _ =>
-        var i = 0
-        val rowSize = rows.length
-        val values = new java.util.ArrayList[String](rowSize)
-        while (i < rowSize) {
-          val row = rows(i)
-          val isNull = isColumnNullAt(row, ordinal)
-          nulls.set(i, isNull)
-          val value = if (isNull) {
-            ""
-          } else {
-            (row.get(ordinal), typ).toString()
-          }
-          values.add(value)
-          i += 1
-        }
-        TColumn.stringVal(new TStringColumn(values, nulls))
+        asStringTColumn(
+          rows,
+          ordinal,
+          convertFunc = (row, ordinal) => (row.get(ordinal), typ).toString())
     }
   }
 
-  override def toTColumnValue(ordinal: Int, row: Row, types: Schema): 
TColumnValue = {
+  override def toTColumnValue(row: Row, ordinal: Int, types: Schema): 
TColumnValue = {
     getColumnType(types, ordinal) match {
-      case t @ (BOOLEAN_TYPE | BINARY_TYPE | BINARY_TYPE | TINYINT_TYPE | 
INT_TYPE |
-          BIGINT_TYPE | FLOAT_TYPE | DOUBLE_TYPE | STRING_TYPE) =>
-        toTTypeColumnVal(t, row, ordinal)
-
-      case _ =>
-        val tStrValue = new TStringValue
-        if (!isColumnNullAt(row, ordinal)) {
-          tStrValue.setValue((row.get(ordinal), 
types(ordinal).dataType).toString())
-        }
-        TColumnValue.stringVal(tStrValue)
+      case BOOLEAN_TYPE => asBooleanTColumnValue(row, ordinal)
+      case BINARY_TYPE => asByteTColumnValue(row, ordinal)
+      case TINYINT_TYPE => asShortTColumnValue(row, ordinal)
+      case INT_TYPE => asIntegerTColumnValue(row, ordinal)
+      case BIGINT_TYPE => asLongTColumnValue(row, ordinal)
+      case FLOAT_TYPE => asFloatTColumnValue(row, ordinal)
+      case DOUBLE_TYPE => asDoubleTColumnValue(row, ordinal)
+      case STRING_TYPE => asStringTColumnValue(row, ordinal)
+      case otherType =>
+        asStringTColumnValue(row, ordinal, rawValue => (rawValue, 
otherType).toString())
     }
   }
 

Reply via email to