This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 5b592d07c [KYUUBI #6404] Fix HiveResult.toHiveString compatibility for
Spark 4.0
5b592d07c is described below
commit 5b592d07cab9c77cac136b496e3649f511627e57
Author: Cheng Pan <[email protected]>
AuthorDate: Wed May 22 15:07:36 2024 +0800
[KYUUBI #6404] Fix HiveResult.toHiveString compatibility for Spark 4.0
# :mag: Description
SPARK-47911 introduced breaking changes for `HiveResult.toHiveString`, here
we use reflection to fix the compatibility.
## Types of changes :bookmark:
- [x] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing
functionality to change)
## Test Plan ๐งช
```
build/mvn clean install -Pscala-2.13 -Pspark-master \
-pl externals/kyuubi-spark-sql-engine -am \
-Dtest=none
-DwildcardSuites=org.apache.kyuubi.engine.spark.schema.RowSetSuite
```
before - compilation error
```
[INFO] --- scala-maven-plugin:4.8.0:compile (scala-compile-first)
kyuubi-spark-sql-engine_2.13 ---
...
[ERROR] [Error]
/home/kyuubi/apache-kyuubi/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/RowSet.scala:30:
not enough arguments for method toHiveString: (a: (Any,
org.apache.spark.sql.types.DataType), nested: Boolean, formatters:
org.apache.spark.sql.execution.HiveResult.TimeFormatters, binaryFormatter:
org.apache.spark.sql.execution.HiveResult.BinaryFormatter): String.
Unspecified value parameter binaryFormatter.
```
after - UT pass
```
[INFO] --- scalatest-maven-plugin:2.2.0:test (test)
kyuubi-spark-sql-engine_2.13 ---
[INFO] ScalaTest report directory:
/home/kyuubi/apache-kyuubi/externals/kyuubi-spark-sql-engine/target/surefire-reports
Discovery starting.
Discovery completed in 1 second, 959 milliseconds.
Run starting. Expected test count is: 3
RowSetSuite:
- column based set
- row based set
- to row set
Run completed in 2 seconds, 712 milliseconds.
Total number of tests run: 3
Suites: completed 2, aborted 0
Tests: succeeded 3, failed 0, canceled 0, ignored 0, pending 0
All tests passed.
```
---
# Checklist ๐
- [x] This patch was not authored or co-authored using [Generative
Tooling](https://www.apache.org/legal/generative-tooling.html)
**Be nice. Be informative.**
Closes #6404 from pan3793/hive-string.
Closes #6404
6b3c743eb [Cheng Pan] fix breaking change of HiveResult.toHiveString caused
by SPARK-47911
Authored-by: Cheng Pan <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
.../kyuubi/engine/spark/SparkSQLEngine.scala | 5 +++
.../apache/kyuubi/engine/spark/schema/RowSet.scala | 36 +++++++++++++++++++---
.../spark/schema/SparkTRowSetGenerator.scala | 13 ++++++--
.../spark/sql/kyuubi/SparkDatasetHelper.scala | 14 ++++++---
.../kyuubi/engine/spark/schema/RowSetSuite.scala | 15 ++++++---
5 files changed, 65 insertions(+), 18 deletions(-)
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
index c9a5f21dd..d4418ec26 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
@@ -262,6 +262,11 @@ object SparkSQLEngine extends Logging {
val rootDir =
_sparkConf.getOption("spark.repl.classdir").getOrElse(getLocalDir(_sparkConf))
val outputDir = Utils.createTempDir(prefix = "repl", root = rootDir)
_sparkConf.setIfMissing("spark.sql.legacy.castComplexTypesToString.enabled",
"true")
+ // SPARK-47911: we must set a value instead of leaving it as None,
otherwise, we will get a
+ // "Cannot mutate ReadOnlySQLConf" exception when task calling
HiveResult.getBinaryFormatter.
+ // Here we follow the HiveResult.getBinaryFormatter behavior to set it to
UTF8 if configuration
+ // is absent to reserve the legacy behavior for compatibility.
+ _sparkConf.setIfMissing("spark.sql.binaryOutputStyle", "UTF8")
_sparkConf.setIfMissing("spark.master", "local")
_sparkConf.set(
"spark.redaction.regex",
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/RowSet.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/RowSet.scala
index c5f322108..47e6351b2 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/RowSet.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/RowSet.scala
@@ -17,17 +17,43 @@
package org.apache.kyuubi.engine.spark.schema
+import java.lang.{Boolean => JBoolean}
+
import org.apache.spark.sql.execution.HiveResult
import org.apache.spark.sql.execution.HiveResult.TimeFormatters
import org.apache.spark.sql.types._
+import org.apache.kyuubi.util.reflect.DynMethods
+
object RowSet {
+ // SPARK-47911 (4.0.0) introduced it
+ type BinaryFormatter = Array[Byte] => String
+
+ def getBinaryFormatter: BinaryFormatter =
+ DynMethods.builder("getBinaryFormatter")
+ .impl(HiveResult.getClass) // for Spark 4.0 and later
+ .orNoop() // for Spark 3.5 and before
+ .buildChecked(HiveResult)
+ .invokeChecked[BinaryFormatter]()
+
def toHiveString(
valueAndType: (Any, DataType),
- nested: Boolean = false,
- timeFormatters: TimeFormatters): String = {
- HiveResult.toHiveString(valueAndType, nested, timeFormatters)
- }
-
+ nested: JBoolean = false,
+ timeFormatters: TimeFormatters,
+ binaryFormatter: BinaryFormatter): String =
+ DynMethods.builder("toHiveString")
+ .impl( // for Spark 3.5 and before
+ HiveResult.getClass,
+ classOf[(Any, DataType)],
+ classOf[Boolean],
+ classOf[TimeFormatters])
+ .impl( // for Spark 4.0 and later
+ HiveResult.getClass,
+ classOf[(Any, DataType)],
+ classOf[Boolean],
+ classOf[TimeFormatters],
+ classOf[BinaryFormatter])
+ .buildChecked(HiveResult)
+ .invokeChecked[String](valueAndType, nested, timeFormatters,
binaryFormatter)
}
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/SparkTRowSetGenerator.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/SparkTRowSetGenerator.scala
index 1d1b5ef6a..aa6c3383f 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/SparkTRowSetGenerator.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/SparkTRowSetGenerator.scala
@@ -27,8 +27,9 @@ import org.apache.kyuubi.shaded.hive.service.rpc.thrift._
class SparkTRowSetGenerator
extends TRowSetGenerator[StructType, Row, DataType] {
- // reused time formatters in single RowSet generation, see KYUUBI-5811
+ // reused time formatters in single RowSet generation, see KYUUBI #5811
private val tf = HiveResult.getTimeFormatters
+ private val bf = RowSet.getBinaryFormatter
override def getColumnSizeFromSchemaType(schema: StructType): Int =
schema.length
@@ -51,6 +52,7 @@ class SparkTRowSetGenerator
case BinaryType => asByteArrayTColumn(rows, ordinal)
case _ =>
val timeFormatters = tf
+ val binaryFormatter = bf
asStringTColumn(
rows,
ordinal,
@@ -58,7 +60,8 @@ class SparkTRowSetGenerator
(row, ordinal) =>
RowSet.toHiveString(
getColumnAs[Any](row, ordinal) -> typ,
- timeFormatters = timeFormatters))
+ timeFormatters = timeFormatters,
+ binaryFormatter = binaryFormatter))
}
}
@@ -75,7 +78,11 @@ class SparkTRowSetGenerator
case _ => asStringTColumnValue(
row,
ordinal,
- rawValue => RowSet.toHiveString(rawValue -> types(ordinal).dataType,
timeFormatters = tf))
+ rawValue =>
+ RowSet.toHiveString(
+ rawValue -> types(ordinal).dataType,
+ timeFormatters = tf,
+ binaryFormatter = bf))
}
}
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
index b78c8b7a3..dda7bb4d0 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
@@ -103,7 +103,7 @@ object SparkDatasetHelper extends Logging {
// an udf to call `RowSet.toHiveString` on complex types(struct/array/map)
and timestamp type.
// TODO: reuse the timeFormatters on greater scale if possible,
- // recreating timeFormatters may cause performance issue, see
[KYUUBI#5811]
+ // recreate timeFormatters each time may cause performance issue,
see KYUUBI #5811
val toHiveStringUDF = udf[String, Row, String]((row, schemaDDL) => {
val dt = DataType.fromDDL(schemaDDL)
dt match {
@@ -111,22 +111,26 @@ object SparkDatasetHelper extends Logging {
RowSet.toHiveString(
(row, st),
nested = true,
- timeFormatters = HiveResult.getTimeFormatters)
+ timeFormatters = HiveResult.getTimeFormatters,
+ binaryFormatter = RowSet.getBinaryFormatter)
case StructType(Array(StructField(_, at: ArrayType, _, _))) =>
RowSet.toHiveString(
(row.toSeq.head, at),
nested = true,
- timeFormatters = HiveResult.getTimeFormatters)
+ timeFormatters = HiveResult.getTimeFormatters,
+ binaryFormatter = RowSet.getBinaryFormatter)
case StructType(Array(StructField(_, mt: MapType, _, _))) =>
RowSet.toHiveString(
(row.toSeq.head, mt),
nested = true,
- timeFormatters = HiveResult.getTimeFormatters)
+ timeFormatters = HiveResult.getTimeFormatters,
+ binaryFormatter = RowSet.getBinaryFormatter)
case StructType(Array(StructField(_, tt: TimestampType, _, _))) =>
RowSet.toHiveString(
(row.toSeq.head, tt),
nested = true,
- timeFormatters = HiveResult.getTimeFormatters)
+ timeFormatters = HiveResult.getTimeFormatters,
+ binaryFormatter = RowSet.getBinaryFormatter)
case _ =>
throw new UnsupportedOperationException
}
diff --git
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/schema/RowSetSuite.scala
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/schema/RowSetSuite.scala
index 228bdcaf2..417e84f8a 100644
---
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/schema/RowSetSuite.scala
+++
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/schema/RowSetSuite.scala
@@ -168,7 +168,8 @@ class RowSetSuite extends KyuubiFunSuite {
case (b, i) =>
assert(b === RowSet.toHiveString(
Date.valueOf(s"2018-11-${i + 1}") -> DateType,
- timeFormatters = HiveResult.getTimeFormatters))
+ timeFormatters = HiveResult.getTimeFormatters,
+ binaryFormatter = RowSet.getBinaryFormatter))
}
val tsCol = cols.next().getStringVal
@@ -177,7 +178,8 @@ class RowSetSuite extends KyuubiFunSuite {
case (b, i) => assert(b ===
RowSet.toHiveString(
Timestamp.valueOf(s"2018-11-17 13:33:33.$i") -> TimestampType,
- timeFormatters = HiveResult.getTimeFormatters))
+ timeFormatters = HiveResult.getTimeFormatters,
+ binaryFormatter = RowSet.getBinaryFormatter))
}
val binCol = cols.next().getBinaryVal
@@ -191,7 +193,8 @@ class RowSetSuite extends KyuubiFunSuite {
case (b, 11) => assert(b === "NULL")
case (b, i) => assert(b === RowSet.toHiveString(
Array.fill(i)(java.lang.Double.valueOf(s"$i.$i")).toSeq ->
ArrayType(DoubleType),
- timeFormatters = HiveResult.getTimeFormatters))
+ timeFormatters = HiveResult.getTimeFormatters,
+ binaryFormatter = RowSet.getBinaryFormatter))
}
val mapCol = cols.next().getStringVal
@@ -199,7 +202,8 @@ class RowSetSuite extends KyuubiFunSuite {
case (b, 11) => assert(b === "NULL")
case (b, i) => assert(b === RowSet.toHiveString(
Map(i -> java.lang.Double.valueOf(s"$i.$i")) -> MapType(IntegerType,
DoubleType),
- timeFormatters = HiveResult.getTimeFormatters))
+ timeFormatters = HiveResult.getTimeFormatters,
+ binaryFormatter = RowSet.getBinaryFormatter))
}
val intervalCol = cols.next().getStringVal
@@ -250,7 +254,8 @@ class RowSetSuite extends KyuubiFunSuite {
assert(r8.get(13).getStringVal.getValue ===
RowSet.toHiveString(
Map(7 -> 7.7d) -> MapType(IntegerType, DoubleType),
- timeFormatters = HiveResult.getTimeFormatters))
+ timeFormatters = HiveResult.getTimeFormatters,
+ binaryFormatter = RowSet.getBinaryFormatter))
val r9 = iter.next().getColVals
assert(r9.get(14).getStringVal.getValue === new CalendarInterval(8, 8,
8).toString)