This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 4463cc8f9 [KYUUBI #5811] [SPARK] Reuse time formatters instance in
value serialization of TRowSet generation
4463cc8f9 is described below
commit 4463cc8f976d77e01b96bd9d70665f16e8fb7529
Author: Bowen Liang <[email protected]>
AuthorDate: Thu Dec 7 20:22:18 2023 +0800
[KYUUBI #5811] [SPARK] Reuse time formatters instance in value
serialization of TRowSet generation
# :mag: Description
## Issue References ๐
Subtask of #5808.
## Describe Your Solution ๐ง
Value serialization to Hive style string by `HiveResult.toHiveString`
requires a `TimeFormatters` instance handling the date/time data types. Reusing
the pre-created time formatters's instance, it dramatically reduces the
overhead and improves the TRowset generation.
This may help to reduce memory footprints and fewer operations for TRowSet
generation performance.
This is also aligned to the Spark's `RowSetUtils` in the implementation of
RowSet generation for
[SPARK-39041](https://issues.apache.org/jira/browse/SPARK-39041) by yaooqinn ,
with explicitly declared TimeFormatters instance.
## Types of changes :bookmark:
- [x] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing
functionality to change)
## Test Plan ๐งช
#### Behavior Without This Pull Request :coffin:
#### Behavior With This Pull Request :tada:
#### Related Unit Tests
---
# Checklists
## ๐ Author Self Checklist
- [x] My code follows the [style
guidelines](https://kyuubi.readthedocs.io/en/master/contributing/code/style.html)
of this project
- [x] I have performed a self-review
- [x] I have commented my code, particularly in hard-to-understand areas
- [ ] I have made corresponding changes to the documentation
- [ ] My changes generate no new warnings
- [ ] I have added tests that prove my fix is effective or that my feature
works
- [ ] New and existing unit tests pass locally with my changes
- [x] This patch was not authored or co-authored using [Generative
Tooling](https://www.apache.org/legal/generative-tooling.html)
## ๐ Committer Pre-Merge Checklist
- [x] Pull request title is okay.
- [x] No license issues.
- [x] Milestone correctly set?
- [x] Test coverage is ok
- [x] Assignees are selected.
- [x] Minimum number of approvals
- [x] No changes are requested
**Be nice. Be informative.**
Closes #5811 from bowenliang123/rowset-timeformatter.
Closes #5811
22709914e [Bowen Liang] Reuse time formatters instance in value
serialization of TRowSet
Authored-by: Bowen Liang <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
.../apache/kyuubi/engine/spark/schema/RowSet.scala | 29 +++++++++++++++-------
.../spark/sql/kyuubi/SparkDatasetHelper.scala | 23 ++++++++++++++---
.../kyuubi/engine/spark/schema/RowSetSuite.scala | 19 ++++++++++----
3 files changed, 53 insertions(+), 18 deletions(-)
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/RowSet.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/RowSet.scala
index d4ba5460a..b9e9f6411 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/RowSet.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/RowSet.scala
@@ -23,6 +23,7 @@ import scala.collection.JavaConverters._
import org.apache.spark.sql.Row
import org.apache.spark.sql.execution.HiveResult
+import org.apache.spark.sql.execution.HiveResult.TimeFormatters
import org.apache.spark.sql.types._
import org.apache.kyuubi.shaded.hive.service.rpc.thrift._
@@ -30,9 +31,10 @@ import org.apache.kyuubi.util.RowSetUtils._
object RowSet {
- def toHiveString(valueAndType: (Any, DataType), nested: Boolean = false):
String = {
- // compatible w/ Spark 3.1 and above
- val timeFormatters = HiveResult.getTimeFormatters
+ def toHiveString(
+ valueAndType: (Any, DataType),
+ nested: Boolean = false,
+ timeFormatters: TimeFormatters): String = {
HiveResult.toHiveString(valueAndType, nested, timeFormatters)
}
@@ -71,6 +73,7 @@ object RowSet {
def toRowBasedSet(rows: Seq[Row], schema: StructType): TRowSet = {
val rowSize = rows.length
val tRows = new java.util.ArrayList[TRow](rowSize)
+ val timeFormatters = HiveResult.getTimeFormatters
var i = 0
while (i < rowSize) {
val row = rows(i)
@@ -78,7 +81,7 @@ object RowSet {
var j = 0
val columnSize = row.length
while (j < columnSize) {
- val columnValue = toTColumnValue(j, row, schema)
+ val columnValue = toTColumnValue(j, row, schema, timeFormatters)
tRow.addToColVals(columnValue)
j += 1
}
@@ -91,18 +94,23 @@ object RowSet {
def toColumnBasedSet(rows: Seq[Row], schema: StructType): TRowSet = {
val rowSize = rows.length
val tRowSet = new TRowSet(0, new java.util.ArrayList[TRow](rowSize))
+ val timeFormatters = HiveResult.getTimeFormatters
var i = 0
val columnSize = schema.length
while (i < columnSize) {
val field = schema(i)
- val tColumn = toTColumn(rows, i, field.dataType)
+ val tColumn = toTColumn(rows, i, field.dataType, timeFormatters)
tRowSet.addToColumns(tColumn)
i += 1
}
tRowSet
}
- private def toTColumn(rows: Seq[Row], ordinal: Int, typ: DataType): TColumn
= {
+ private def toTColumn(
+ rows: Seq[Row],
+ ordinal: Int,
+ typ: DataType,
+ timeFormatters: TimeFormatters): TColumn = {
val nulls = new java.util.BitSet()
typ match {
case BooleanType =>
@@ -152,7 +160,7 @@ object RowSet {
while (i < rowSize) {
val row = rows(i)
nulls.set(i, row.isNullAt(ordinal))
- values.add(toHiveString(row.get(ordinal) -> typ))
+ values.add(toHiveString(row.get(ordinal) -> typ, timeFormatters =
timeFormatters))
i += 1
}
TColumn.stringVal(new TStringColumn(values, nulls))
@@ -184,7 +192,8 @@ object RowSet {
private def toTColumnValue(
ordinal: Int,
row: Row,
- types: StructType): TColumnValue = {
+ types: StructType,
+ timeFormatters: TimeFormatters): TColumnValue = {
types(ordinal).dataType match {
case BooleanType =>
val boolValue = new TBoolValue
@@ -232,7 +241,9 @@ object RowSet {
case _ =>
val tStrValue = new TStringValue
if (!row.isNullAt(ordinal)) {
- tStrValue.setValue(toHiveString(row.get(ordinal) ->
types(ordinal).dataType))
+ tStrValue.setValue(toHiveString(
+ row.get(ordinal) -> types(ordinal).dataType,
+ timeFormatters = timeFormatters))
}
TColumnValue.stringVal(tStrValue)
}
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
index c0f9d61c2..e84312268 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
@@ -26,6 +26,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.{CollectLimitExec, LocalTableScanExec,
SparkPlan, SQLExecution}
+import org.apache.spark.sql.execution.HiveResult
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
import org.apache.spark.sql.execution.arrow.KyuubiArrowConverters
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
@@ -105,17 +106,31 @@ object SparkDatasetHelper extends Logging {
val quotedCol = (name: String) => col(quoteIfNeeded(name))
// an udf to call `RowSet.toHiveString` on complex types(struct/array/map)
and timestamp type.
+ // TODO: reuse the timeFormatters on greater scale if possible,
+ // recreating timeFormatters may cause performance issue, see
[KYUUBI#5811]
val toHiveStringUDF = udf[String, Row, String]((row, schemaDDL) => {
val dt = DataType.fromDDL(schemaDDL)
dt match {
case StructType(Array(StructField(_, st: StructType, _, _))) =>
- RowSet.toHiveString((row, st), nested = true)
+ RowSet.toHiveString(
+ (row, st),
+ nested = true,
+ timeFormatters = HiveResult.getTimeFormatters)
case StructType(Array(StructField(_, at: ArrayType, _, _))) =>
- RowSet.toHiveString((row.toSeq.head, at), nested = true)
+ RowSet.toHiveString(
+ (row.toSeq.head, at),
+ nested = true,
+ timeFormatters = HiveResult.getTimeFormatters)
case StructType(Array(StructField(_, mt: MapType, _, _))) =>
- RowSet.toHiveString((row.toSeq.head, mt), nested = true)
+ RowSet.toHiveString(
+ (row.toSeq.head, mt),
+ nested = true,
+ timeFormatters = HiveResult.getTimeFormatters)
case StructType(Array(StructField(_, tt: TimestampType, _, _))) =>
- RowSet.toHiveString((row.toSeq.head, tt), nested = true)
+ RowSet.toHiveString(
+ (row.toSeq.head, tt),
+ nested = true,
+ timeFormatters = HiveResult.getTimeFormatters)
case _ =>
throw new UnsupportedOperationException
}
diff --git
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/schema/RowSetSuite.scala
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/schema/RowSetSuite.scala
index 50bcfa800..dec185897 100644
---
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/schema/RowSetSuite.scala
+++
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/schema/RowSetSuite.scala
@@ -25,6 +25,7 @@ import java.time.{Instant, LocalDate}
import scala.collection.JavaConverters._
import org.apache.spark.sql.Row
+import org.apache.spark.sql.execution.HiveResult
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.CalendarInterval
@@ -165,14 +166,18 @@ class RowSetSuite extends KyuubiFunSuite {
dateCol.getValues.asScala.zipWithIndex.foreach {
case (b, 11) => assert(b === "NULL")
case (b, i) =>
- assert(b === RowSet.toHiveString(Date.valueOf(s"2018-11-${i + 1}") ->
DateType))
+ assert(b === RowSet.toHiveString(
+ Date.valueOf(s"2018-11-${i + 1}") -> DateType,
+ timeFormatters = HiveResult.getTimeFormatters))
}
val tsCol = cols.next().getStringVal
tsCol.getValues.asScala.zipWithIndex.foreach {
case (b, 11) => assert(b === "NULL")
case (b, i) => assert(b ===
- RowSet.toHiveString(Timestamp.valueOf(s"2018-11-17 13:33:33.$i") ->
TimestampType))
+ RowSet.toHiveString(
+ Timestamp.valueOf(s"2018-11-17 13:33:33.$i") -> TimestampType,
+ timeFormatters = HiveResult.getTimeFormatters))
}
val binCol = cols.next().getBinaryVal
@@ -185,14 +190,16 @@ class RowSetSuite extends KyuubiFunSuite {
arrCol.getValues.asScala.zipWithIndex.foreach {
case (b, 11) => assert(b === "NULL")
case (b, i) => assert(b === RowSet.toHiveString(
- Array.fill(i)(java.lang.Double.valueOf(s"$i.$i")).toSeq ->
ArrayType(DoubleType)))
+ Array.fill(i)(java.lang.Double.valueOf(s"$i.$i")).toSeq ->
ArrayType(DoubleType),
+ timeFormatters = HiveResult.getTimeFormatters))
}
val mapCol = cols.next().getStringVal
mapCol.getValues.asScala.zipWithIndex.foreach {
case (b, 11) => assert(b === "NULL")
case (b, i) => assert(b === RowSet.toHiveString(
- Map(i -> java.lang.Double.valueOf(s"$i.$i")) -> MapType(IntegerType,
DoubleType)))
+ Map(i -> java.lang.Double.valueOf(s"$i.$i")) -> MapType(IntegerType,
DoubleType),
+ timeFormatters = HiveResult.getTimeFormatters))
}
val intervalCol = cols.next().getStringVal
@@ -241,7 +248,9 @@ class RowSetSuite extends KyuubiFunSuite {
val r8 = iter.next().getColVals
assert(r8.get(12).getStringVal.getValue ===
Array.fill(7)(7.7d).mkString("[", ",", "]"))
assert(r8.get(13).getStringVal.getValue ===
- RowSet.toHiveString(Map(7 -> 7.7d) -> MapType(IntegerType, DoubleType)))
+ RowSet.toHiveString(
+ Map(7 -> 7.7d) -> MapType(IntegerType, DoubleType),
+ timeFormatters = HiveResult.getTimeFormatters))
val r9 = iter.next().getColVals
assert(r9.get(14).getStringVal.getValue === new CalendarInterval(8, 8,
8).toString)