This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new eb667f9360a [SPARK-42074][SQL] Enable `KryoSerializer` in
`TPCDSQueryBenchmark` to enforce SQL class registration
eb667f9360a is described below
commit eb667f9360acc43a3dcdcd9cfe54fc97942805e9
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Sun Jan 15 16:15:04 2023 -0800
[SPARK-42074][SQL] Enable `KryoSerializer` in `TPCDSQueryBenchmark` to
enforce SQL class registration
### What changes were proposed in this pull request?
This PR aims to enable `KryoSerializer` in `TPCDSQueryBenchmark` to enforce
build-in SQL class registration.
### Why are the changes needed?
GitHub Action CI will ensure that all new SQL related classes to be
registered .
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Pass the CIs. I also manually tested like the following.
```
$ build/sbt "sql/Test/runMain
org.apache.spark.sql.execution.benchmark.TPCDSQueryBenchmark --data-location
/tmp/tpcds-sf-1"
...
[success] Total time: 2050 s (34:10), completed Jan 15, 2023 4:06:12 PM
```
Closes #39584 from dongjoon-hyun/SPARK-42074.
Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../apache/spark/serializer/KryoSerializer.scala | 32 ++++++++++++++++++++++
.../execution/benchmark/TPCDSQueryBenchmark.scala | 2 ++
2 files changed, 34 insertions(+)
diff --git
a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index f860b2a0865..5e2bdd8085e 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -46,6 +46,7 @@ import org.apache.spark.scheduler.{CompressedMapStatus,
HighlyCompressedMapStatu
import org.apache.spark.storage._
import org.apache.spark.util.{BoundedPriorityQueue, ByteBufferInputStream,
SerializableConfiguration, SerializableJobConf, Utils}
import org.apache.spark.util.collection.CompactBuffer
+import org.apache.spark.util.io.ChunkedByteBuffer
/**
* A Spark serializer that uses the <a href="https://code.google.com/p/kryo/">
@@ -220,6 +221,7 @@ class KryoSerializer(conf: SparkConf)
kryo.register(Utils.classForName("scala.collection.immutable.$colon$colon"))
kryo.register(Utils.classForName("scala.collection.immutable.Map$EmptyMap$"))
kryo.register(Utils.classForName("scala.math.Ordering$Reverse"))
+ kryo.register(Utils.classForName("scala.reflect.ClassTag$GenericClassTag"))
kryo.register(classOf[ArrayBuffer[Any]])
kryo.register(classOf[Array[Array[Byte]]])
@@ -466,9 +468,11 @@ private[serializer] object KryoSerializer {
// Commonly used classes.
private val toRegister: Seq[Class[_]] = Seq(
ByteBuffer.allocate(1).getClass,
+ classOf[Array[ByteBuffer]],
classOf[StorageLevel],
classOf[CompressedMapStatus],
classOf[HighlyCompressedMapStatus],
+ classOf[ChunkedByteBuffer],
classOf[CompactBuffer[_]],
classOf[BlockManagerId],
classOf[Array[Boolean]],
@@ -503,9 +507,37 @@ private[serializer] object KryoSerializer {
// SQL / ML / MLlib classes once and then re-use that filtered list in
newInstance() calls.
private lazy val loadableSparkClasses: Seq[Class[_]] = {
Seq(
+ "org.apache.spark.sql.catalyst.expressions.BoundReference",
+ "org.apache.spark.sql.catalyst.expressions.SortOrder",
+ "[Lorg.apache.spark.sql.catalyst.expressions.SortOrder;",
+ "org.apache.spark.sql.catalyst.InternalRow",
+ "org.apache.spark.sql.catalyst.InternalRow$",
+ "[Lorg.apache.spark.sql.catalyst.InternalRow;",
"org.apache.spark.sql.catalyst.expressions.UnsafeRow",
"org.apache.spark.sql.catalyst.expressions.UnsafeArrayData",
"org.apache.spark.sql.catalyst.expressions.UnsafeMapData",
+
"org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering",
+ "org.apache.spark.sql.catalyst.expressions.Ascending$",
+ "org.apache.spark.sql.catalyst.expressions.NullsFirst$",
+ "org.apache.spark.sql.catalyst.trees.Origin",
+ "org.apache.spark.sql.types.IntegerType",
+ "org.apache.spark.sql.types.IntegerType$",
+ "org.apache.spark.sql.types.LongType$",
+ "org.apache.spark.sql.types.Metadata",
+ "org.apache.spark.sql.types.StringType$",
+ "org.apache.spark.sql.types.StructField",
+ "[Lorg.apache.spark.sql.types.StructField;",
+ "org.apache.spark.sql.types.StructType",
+ "[Lorg.apache.spark.sql.types.StructType;",
+ "org.apache.spark.sql.types.DateType$",
+ "org.apache.spark.sql.types.DecimalType",
+ "org.apache.spark.sql.types.Decimal$DecimalAsIfIntegral$",
+ "org.apache.spark.sql.types.Decimal$DecimalIsFractional$",
+
"org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTaskResult",
+ "org.apache.spark.sql.execution.joins.EmptyHashedRelation$",
+ "org.apache.spark.sql.execution.joins.LongHashedRelation",
+ "org.apache.spark.sql.execution.joins.LongToUnsafeRowMap",
+ "org.apache.spark.sql.execution.joins.UnsafeHashedRelation",
"org.apache.spark.ml.attribute.Attribute",
"org.apache.spark.ml.attribute.AttributeGroup",
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala
index be0b89bc6bc..ab63ebb1050 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala
@@ -57,6 +57,8 @@ object TPCDSQueryBenchmark extends SqlBasedBenchmark with
Logging {
.set("spark.executor.memory", "3g")
.set("spark.sql.autoBroadcastJoinThreshold", (20 * 1024 * 1024).toString)
.set("spark.sql.crossJoin.enabled", "true")
+ .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+ .set("spark.kryo.registrationRequired", "true")
SparkSession.builder.config(conf).getOrCreate()
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]