This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new eb667f9360a [SPARK-42074][SQL] Enable `KryoSerializer` in 
`TPCDSQueryBenchmark` to enforce SQL class registration
eb667f9360a is described below

commit eb667f9360acc43a3dcdcd9cfe54fc97942805e9
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Sun Jan 15 16:15:04 2023 -0800

    [SPARK-42074][SQL] Enable `KryoSerializer` in `TPCDSQueryBenchmark` to 
enforce SQL class registration
    
    ### What changes were proposed in this pull request?
    
    This PR aims to enable `KryoSerializer` in `TPCDSQueryBenchmark` to enforce 
build-in SQL class registration.
    
    ### Why are the changes needed?
    
    GitHub Action CI will ensure that all new SQL related classes to be 
registered .
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Pass the CIs. I also manually tested like the following.
    
    ```
    $ build/sbt "sql/Test/runMain 
org.apache.spark.sql.execution.benchmark.TPCDSQueryBenchmark --data-location 
/tmp/tpcds-sf-1"
    ...
    [success] Total time: 2050 s (34:10), completed Jan 15, 2023 4:06:12 PM
    ```
    
    Closes #39584 from dongjoon-hyun/SPARK-42074.
    
    Authored-by: Dongjoon Hyun <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../apache/spark/serializer/KryoSerializer.scala   | 32 ++++++++++++++++++++++
 .../execution/benchmark/TPCDSQueryBenchmark.scala  |  2 ++
 2 files changed, 34 insertions(+)

diff --git 
a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala 
b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index f860b2a0865..5e2bdd8085e 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -46,6 +46,7 @@ import org.apache.spark.scheduler.{CompressedMapStatus, 
HighlyCompressedMapStatu
 import org.apache.spark.storage._
 import org.apache.spark.util.{BoundedPriorityQueue, ByteBufferInputStream, 
SerializableConfiguration, SerializableJobConf, Utils}
 import org.apache.spark.util.collection.CompactBuffer
+import org.apache.spark.util.io.ChunkedByteBuffer
 
 /**
  * A Spark serializer that uses the <a href="https://code.google.com/p/kryo/";>
@@ -220,6 +221,7 @@ class KryoSerializer(conf: SparkConf)
     
kryo.register(Utils.classForName("scala.collection.immutable.$colon$colon"))
     
kryo.register(Utils.classForName("scala.collection.immutable.Map$EmptyMap$"))
     kryo.register(Utils.classForName("scala.math.Ordering$Reverse"))
+    kryo.register(Utils.classForName("scala.reflect.ClassTag$GenericClassTag"))
     kryo.register(classOf[ArrayBuffer[Any]])
     kryo.register(classOf[Array[Array[Byte]]])
 
@@ -466,9 +468,11 @@ private[serializer] object KryoSerializer {
   // Commonly used classes.
   private val toRegister: Seq[Class[_]] = Seq(
     ByteBuffer.allocate(1).getClass,
+    classOf[Array[ByteBuffer]],
     classOf[StorageLevel],
     classOf[CompressedMapStatus],
     classOf[HighlyCompressedMapStatus],
+    classOf[ChunkedByteBuffer],
     classOf[CompactBuffer[_]],
     classOf[BlockManagerId],
     classOf[Array[Boolean]],
@@ -503,9 +507,37 @@ private[serializer] object KryoSerializer {
   // SQL / ML / MLlib classes once and then re-use that filtered list in 
newInstance() calls.
   private lazy val loadableSparkClasses: Seq[Class[_]] = {
     Seq(
+      "org.apache.spark.sql.catalyst.expressions.BoundReference",
+      "org.apache.spark.sql.catalyst.expressions.SortOrder",
+      "[Lorg.apache.spark.sql.catalyst.expressions.SortOrder;",
+      "org.apache.spark.sql.catalyst.InternalRow",
+      "org.apache.spark.sql.catalyst.InternalRow$",
+      "[Lorg.apache.spark.sql.catalyst.InternalRow;",
       "org.apache.spark.sql.catalyst.expressions.UnsafeRow",
       "org.apache.spark.sql.catalyst.expressions.UnsafeArrayData",
       "org.apache.spark.sql.catalyst.expressions.UnsafeMapData",
+      
"org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering",
+      "org.apache.spark.sql.catalyst.expressions.Ascending$",
+      "org.apache.spark.sql.catalyst.expressions.NullsFirst$",
+      "org.apache.spark.sql.catalyst.trees.Origin",
+      "org.apache.spark.sql.types.IntegerType",
+      "org.apache.spark.sql.types.IntegerType$",
+      "org.apache.spark.sql.types.LongType$",
+      "org.apache.spark.sql.types.Metadata",
+      "org.apache.spark.sql.types.StringType$",
+      "org.apache.spark.sql.types.StructField",
+      "[Lorg.apache.spark.sql.types.StructField;",
+      "org.apache.spark.sql.types.StructType",
+      "[Lorg.apache.spark.sql.types.StructType;",
+      "org.apache.spark.sql.types.DateType$",
+      "org.apache.spark.sql.types.DecimalType",
+      "org.apache.spark.sql.types.Decimal$DecimalAsIfIntegral$",
+      "org.apache.spark.sql.types.Decimal$DecimalIsFractional$",
+      
"org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTaskResult",
+      "org.apache.spark.sql.execution.joins.EmptyHashedRelation$",
+      "org.apache.spark.sql.execution.joins.LongHashedRelation",
+      "org.apache.spark.sql.execution.joins.LongToUnsafeRowMap",
+      "org.apache.spark.sql.execution.joins.UnsafeHashedRelation",
 
       "org.apache.spark.ml.attribute.Attribute",
       "org.apache.spark.ml.attribute.AttributeGroup",
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala
index be0b89bc6bc..ab63ebb1050 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala
@@ -57,6 +57,8 @@ object TPCDSQueryBenchmark extends SqlBasedBenchmark with 
Logging {
       .set("spark.executor.memory", "3g")
       .set("spark.sql.autoBroadcastJoinThreshold", (20 * 1024 * 1024).toString)
       .set("spark.sql.crossJoin.enabled", "true")
+      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+      .set("spark.kryo.registrationRequired", "true")
 
     SparkSession.builder.config(conf).getOrCreate()
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to