This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch branch-1.9
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/branch-1.9 by this push:
     new ca9fa49b3 [KYUUBI #6404] Fix HiveResult.toHiveString compatibility for 
Spark 4.0
ca9fa49b3 is described below

commit ca9fa49b37a32b451466b577fdb1065fbca1b1e7
Author: Cheng Pan <[email protected]>
AuthorDate: Wed May 22 15:07:36 2024 +0800

    [KYUUBI #6404] Fix HiveResult.toHiveString compatibility for Spark 4.0
    
    # :mag: Description
    
    SPARK-47911 introduced breaking changes for `HiveResult.toHiveString`, here 
we use reflection to fix the compatibility.
    
    ## Types of changes :bookmark:
    
    - [x] Bugfix (non-breaking change which fixes an issue)
    - [ ] New feature (non-breaking change which adds functionality)
    - [ ] Breaking change (fix or feature that would cause existing 
functionality to change)
    
    ## Test Plan ๐Ÿงช
    
    ```
    build/mvn clean install -Pscala-2.13 -Pspark-master \
      -pl externals/kyuubi-spark-sql-engine -am \
      -Dtest=none 
-DwildcardSuites=org.apache.kyuubi.engine.spark.schema.RowSetSuite
    ```
    
    before - compilation error
    ```
    [INFO] --- scala-maven-plugin:4.8.0:compile (scala-compile-first)  
kyuubi-spark-sql-engine_2.13 ---
    ...
    [ERROR] [Error] 
/home/kyuubi/apache-kyuubi/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/RowSet.scala:30:
 not enough arguments for method toHiveString: (a: (Any, 
org.apache.spark.sql.types.DataType), nested: Boolean, formatters: 
org.apache.spark.sql.execution.HiveResult.TimeFormatters, binaryFormatter: 
org.apache.spark.sql.execution.HiveResult.BinaryFormatter): String.
    Unspecified value parameter binaryFormatter.
    ```
    
    after - UT pass
    ```
    [INFO] --- scalatest-maven-plugin:2.2.0:test (test)  
kyuubi-spark-sql-engine_2.13 ---
    [INFO] ScalaTest report directory: 
/home/kyuubi/apache-kyuubi/externals/kyuubi-spark-sql-engine/target/surefire-reports
    Discovery starting.
    Discovery completed in 1 second, 959 milliseconds.
    Run starting. Expected test count is: 3
    RowSetSuite:
    - column based set
    - row based set
    - to row set
    Run completed in 2 seconds, 712 milliseconds.
    Total number of tests run: 3
    Suites: completed 2, aborted 0
    Tests: succeeded 3, failed 0, canceled 0, ignored 0, pending 0
    All tests passed.
    ```
    
    ---
    
    # Checklist ๐Ÿ“
    
    - [x] This patch was not authored or co-authored using [Generative 
Tooling](https://www.apache.org/legal/generative-tooling.html)
    
    **Be nice. Be informative.**
    
    Closes #6404 from pan3793/hive-string.
    
    Closes #6404
    
    6b3c743eb [Cheng Pan] fix breaking change of HiveResult.toHiveString caused 
by SPARK-47911
    
    Authored-by: Cheng Pan <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
    (cherry picked from commit 5b592d07cab9c77cac136b496e3649f511627e57)
    Signed-off-by: Cheng Pan <[email protected]>
---
 .../kyuubi/engine/spark/SparkSQLEngine.scala       |  5 +++
 .../apache/kyuubi/engine/spark/schema/RowSet.scala | 36 +++++++++++++++++++---
 .../spark/schema/SparkTRowSetGenerator.scala       | 13 ++++++--
 .../spark/sql/kyuubi/SparkDatasetHelper.scala      | 14 ++++++---
 .../kyuubi/engine/spark/schema/RowSetSuite.scala   | 15 ++++++---
 5 files changed, 65 insertions(+), 18 deletions(-)

diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
index 5ed67963b..5b1868160 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
@@ -262,6 +262,11 @@ object SparkSQLEngine extends Logging {
     val rootDir = 
_sparkConf.getOption("spark.repl.classdir").getOrElse(getLocalDir(_sparkConf))
     val outputDir = Utils.createTempDir(prefix = "repl", root = rootDir)
     
_sparkConf.setIfMissing("spark.sql.legacy.castComplexTypesToString.enabled", 
"true")
+    // SPARK-47911: we must set a value instead of leaving it as None, 
otherwise, we will get a
+    // "Cannot mutate ReadOnlySQLConf" exception when task calling 
HiveResult.getBinaryFormatter.
+    // Here we follow the HiveResult.getBinaryFormatter behavior to set it to 
UTF8 if configuration
+    // is absent to reserve the legacy behavior for compatibility.
+    _sparkConf.setIfMissing("spark.sql.binaryOutputStyle", "UTF8")
     _sparkConf.setIfMissing("spark.master", "local")
     _sparkConf.set(
       "spark.redaction.regex",
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/RowSet.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/RowSet.scala
index c5f322108..47e6351b2 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/RowSet.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/RowSet.scala
@@ -17,17 +17,43 @@
 
 package org.apache.kyuubi.engine.spark.schema
 
+import java.lang.{Boolean => JBoolean}
+
 import org.apache.spark.sql.execution.HiveResult
 import org.apache.spark.sql.execution.HiveResult.TimeFormatters
 import org.apache.spark.sql.types._
 
+import org.apache.kyuubi.util.reflect.DynMethods
+
 object RowSet {
 
+  // SPARK-47911 (4.0.0) introduced it
+  type BinaryFormatter = Array[Byte] => String
+
+  def getBinaryFormatter: BinaryFormatter =
+    DynMethods.builder("getBinaryFormatter")
+      .impl(HiveResult.getClass) // for Spark 4.0 and later
+      .orNoop() // for Spark 3.5 and before
+      .buildChecked(HiveResult)
+      .invokeChecked[BinaryFormatter]()
+
   def toHiveString(
       valueAndType: (Any, DataType),
-      nested: Boolean = false,
-      timeFormatters: TimeFormatters): String = {
-    HiveResult.toHiveString(valueAndType, nested, timeFormatters)
-  }
-
+      nested: JBoolean = false,
+      timeFormatters: TimeFormatters,
+      binaryFormatter: BinaryFormatter): String =
+    DynMethods.builder("toHiveString")
+      .impl( // for Spark 3.5 and before
+        HiveResult.getClass,
+        classOf[(Any, DataType)],
+        classOf[Boolean],
+        classOf[TimeFormatters])
+      .impl( // for Spark 4.0 and later
+        HiveResult.getClass,
+        classOf[(Any, DataType)],
+        classOf[Boolean],
+        classOf[TimeFormatters],
+        classOf[BinaryFormatter])
+      .buildChecked(HiveResult)
+      .invokeChecked[String](valueAndType, nested, timeFormatters, 
binaryFormatter)
 }
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/SparkTRowSetGenerator.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/SparkTRowSetGenerator.scala
index 1d1b5ef6a..aa6c3383f 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/SparkTRowSetGenerator.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/SparkTRowSetGenerator.scala
@@ -27,8 +27,9 @@ import org.apache.kyuubi.shaded.hive.service.rpc.thrift._
 class SparkTRowSetGenerator
   extends TRowSetGenerator[StructType, Row, DataType] {
 
-  // reused time formatters in single RowSet generation, see KYUUBI-5811
+  // reused time formatters in single RowSet generation, see KYUUBI #5811
   private val tf = HiveResult.getTimeFormatters
+  private val bf = RowSet.getBinaryFormatter
 
   override def getColumnSizeFromSchemaType(schema: StructType): Int = 
schema.length
 
@@ -51,6 +52,7 @@ class SparkTRowSetGenerator
       case BinaryType => asByteArrayTColumn(rows, ordinal)
       case _ =>
         val timeFormatters = tf
+        val binaryFormatter = bf
         asStringTColumn(
           rows,
           ordinal,
@@ -58,7 +60,8 @@ class SparkTRowSetGenerator
           (row, ordinal) =>
             RowSet.toHiveString(
               getColumnAs[Any](row, ordinal) -> typ,
-              timeFormatters = timeFormatters))
+              timeFormatters = timeFormatters,
+              binaryFormatter = binaryFormatter))
     }
   }
 
@@ -75,7 +78,11 @@ class SparkTRowSetGenerator
       case _ => asStringTColumnValue(
           row,
           ordinal,
-          rawValue => RowSet.toHiveString(rawValue -> types(ordinal).dataType, 
timeFormatters = tf))
+          rawValue =>
+            RowSet.toHiveString(
+              rawValue -> types(ordinal).dataType,
+              timeFormatters = tf,
+              binaryFormatter = bf))
     }
   }
 
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
index 082c9ff70..8516ffc0f 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
@@ -108,7 +108,7 @@ object SparkDatasetHelper extends Logging {
 
     // an udf to call `RowSet.toHiveString` on complex types(struct/array/map) 
and timestamp type.
     // TODO: reuse the timeFormatters on greater scale if possible,
-    //  recreating timeFormatters may cause performance issue, see 
[KYUUBI#5811]
+    //       recreate timeFormatters each time may cause performance issue, 
see KYUUBI #5811
     val toHiveStringUDF = udf[String, Row, String]((row, schemaDDL) => {
       val dt = DataType.fromDDL(schemaDDL)
       dt match {
@@ -116,22 +116,26 @@ object SparkDatasetHelper extends Logging {
           RowSet.toHiveString(
             (row, st),
             nested = true,
-            timeFormatters = HiveResult.getTimeFormatters)
+            timeFormatters = HiveResult.getTimeFormatters,
+            binaryFormatter = RowSet.getBinaryFormatter)
         case StructType(Array(StructField(_, at: ArrayType, _, _))) =>
           RowSet.toHiveString(
             (row.toSeq.head, at),
             nested = true,
-            timeFormatters = HiveResult.getTimeFormatters)
+            timeFormatters = HiveResult.getTimeFormatters,
+            binaryFormatter = RowSet.getBinaryFormatter)
         case StructType(Array(StructField(_, mt: MapType, _, _))) =>
           RowSet.toHiveString(
             (row.toSeq.head, mt),
             nested = true,
-            timeFormatters = HiveResult.getTimeFormatters)
+            timeFormatters = HiveResult.getTimeFormatters,
+            binaryFormatter = RowSet.getBinaryFormatter)
         case StructType(Array(StructField(_, tt: TimestampType, _, _))) =>
           RowSet.toHiveString(
             (row.toSeq.head, tt),
             nested = true,
-            timeFormatters = HiveResult.getTimeFormatters)
+            timeFormatters = HiveResult.getTimeFormatters,
+            binaryFormatter = RowSet.getBinaryFormatter)
         case _ =>
           throw new UnsupportedOperationException
       }
diff --git 
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/schema/RowSetSuite.scala
 
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/schema/RowSetSuite.scala
index 228bdcaf2..417e84f8a 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/schema/RowSetSuite.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/schema/RowSetSuite.scala
@@ -168,7 +168,8 @@ class RowSetSuite extends KyuubiFunSuite {
       case (b, i) =>
         assert(b === RowSet.toHiveString(
           Date.valueOf(s"2018-11-${i + 1}") -> DateType,
-          timeFormatters = HiveResult.getTimeFormatters))
+          timeFormatters = HiveResult.getTimeFormatters,
+          binaryFormatter = RowSet.getBinaryFormatter))
     }
 
     val tsCol = cols.next().getStringVal
@@ -177,7 +178,8 @@ class RowSetSuite extends KyuubiFunSuite {
       case (b, i) => assert(b ===
           RowSet.toHiveString(
             Timestamp.valueOf(s"2018-11-17 13:33:33.$i") -> TimestampType,
-            timeFormatters = HiveResult.getTimeFormatters))
+            timeFormatters = HiveResult.getTimeFormatters,
+            binaryFormatter = RowSet.getBinaryFormatter))
     }
 
     val binCol = cols.next().getBinaryVal
@@ -191,7 +193,8 @@ class RowSetSuite extends KyuubiFunSuite {
       case (b, 11) => assert(b === "NULL")
       case (b, i) => assert(b === RowSet.toHiveString(
           Array.fill(i)(java.lang.Double.valueOf(s"$i.$i")).toSeq -> 
ArrayType(DoubleType),
-          timeFormatters = HiveResult.getTimeFormatters))
+          timeFormatters = HiveResult.getTimeFormatters,
+          binaryFormatter = RowSet.getBinaryFormatter))
     }
 
     val mapCol = cols.next().getStringVal
@@ -199,7 +202,8 @@ class RowSetSuite extends KyuubiFunSuite {
       case (b, 11) => assert(b === "NULL")
       case (b, i) => assert(b === RowSet.toHiveString(
           Map(i -> java.lang.Double.valueOf(s"$i.$i")) -> MapType(IntegerType, 
DoubleType),
-          timeFormatters = HiveResult.getTimeFormatters))
+          timeFormatters = HiveResult.getTimeFormatters,
+          binaryFormatter = RowSet.getBinaryFormatter))
     }
 
     val intervalCol = cols.next().getStringVal
@@ -250,7 +254,8 @@ class RowSetSuite extends KyuubiFunSuite {
     assert(r8.get(13).getStringVal.getValue ===
       RowSet.toHiveString(
         Map(7 -> 7.7d) -> MapType(IntegerType, DoubleType),
-        timeFormatters = HiveResult.getTimeFormatters))
+        timeFormatters = HiveResult.getTimeFormatters,
+        binaryFormatter = RowSet.getBinaryFormatter))
 
     val r9 = iter.next().getColVals
     assert(r9.get(14).getStringVal.getValue === new CalendarInterval(8, 8, 
8).toString)

Reply via email to