This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 398435fcd6e [SPARK-43898][CORE] Automatically register
`immutable.ArraySeq$ofRef` to `KryoSerializer` for Scala 2.13
398435fcd6e is described below
commit 398435fcd6eb8623219c8d6c3e5966463ebe4429
Author: yangjie01 <[email protected]>
AuthorDate: Wed May 31 19:23:30 2023 -0700
[SPARK-43898][CORE] Automatically register `immutable.ArraySeq$ofRef` to
`KryoSerializer` for Scala 2.13
SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt clean "sql/Test/runMain
org.apache.spark.sql.execution.benchmark.TPCDSQueryBenchmark --data-location
/Users/yangjie01/Tools/tpcds-sf-1 --query-filter q17" -Pscala-2.13
### What changes were proposed in this pull request?
This pr aims to automatically register `immutable.ArraySeq$ofRef` to
`KryoSerializer` for Scala 2.13 to make `TPCDSQueryBenchmark` can run
successfully using Scala 2.13.
### Why are the changes needed?
Scala 2.13 introduced `scala.collection.immutable.ArraySeq$ofRef`, but it
has not been registered with `KryoSerializer`, so when run
`TPCDSQueryBenchmark` using `KryoSerializer` and enabled
`spark.kryo.registrationRequired`(This is the default behavior after
SPARK-42074), there will be the following error:
```
Error: Exception in thread "main" org.apache.spark.SparkException: Job
aborted due to stage failure: Failed to serialize task 741, not attempting to
retry it. Exception during serialization: java.io.IOException:
java.lang.IllegalArgumentException: Class is not registered:
scala.collection.immutable.ArraySeq$ofRef
Note: To register this class use:
kryo.register(scala.collection.immutable.ArraySeq$ofRef.class);
at
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2815)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2751)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2750)
at scala.collection.immutable.List.foreach(List.scala:333)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2750)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1218)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1218)
at scala.Option.foreach(Option.scala:437)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1218)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3014)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2953)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2942)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:983)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2285)
at
org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:385)
at
org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:359)
at
org.apache.spark.sql.execution.datasources.v2.OverwriteByExpressionExec.writeWithV2(WriteToDataSourceV2Exec.scala:243)
at
org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run(WriteToDataSourceV2Exec.scala:337)
at
org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run$(WriteToDataSourceV2Exec.scala:336)
at
org.apache.spark.sql.execution.datasources.v2.OverwriteByExpressionExec.run(WriteToDataSourceV2Exec.scala:243)
at
org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
at
org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)
at
org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
at
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:825)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
at
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512)
at
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)
at
org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
at
org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
at
org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
at
org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:133)
at
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:858)
at
org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:318)
at
org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:247)
at
org.apache.spark.sql.execution.benchmark.SqlBasedBenchmark$DatasetToBenchmark.noop(SqlBasedBenchmark.scala:70)
at
org.apache.spark.sql.execution.benchmark.TPCDSQueryBenchmark$.$anonfun$runTpcdsQueries$5(TPCDSQueryBenchmark.scala:111)
at
org.apache.spark.benchmark.Benchmark.$anonfun$addCase$1(Benchmark.scala:77)
at
org.apache.spark.benchmark.Benchmark.$anonfun$addCase$1$adapted(Benchmark.scala:75)
at org.apache.spark.benchmark.Benchmark.measure(Benchmark.scala:140)
at
org.apache.spark.benchmark.Benchmark.$anonfun$run$1(Benchmark.scala:106)
at
scala.collection.StrictOptimizedIterableOps.map(StrictOptimizedIterableOps.scala:100)
at
scala.collection.StrictOptimizedIterableOps.map$(StrictOptimizedIterableOps.scala:87)
at scala.collection.mutable.ArrayBuffer.map(ArrayBuffer.scala:43)
at org.apache.spark.benchmark.Benchmark.run(Benchmark.scala:104)
at
org.apache.spark.sql.execution.benchmark.TPCDSQueryBenchmark$.$anonfun$runTpcdsQueries$1(TPCDSQueryBenchmark.scala:113)
at
org.apache.spark.sql.execution.benchmark.TPCDSQueryBenchmark$.$anonfun$runTpcdsQueries$1$adapted(TPCDSQueryBenchmark.scala:91)
at scala.collection.immutable.List.foreach(List.scala:333)
at
org.apache.spark.sql.execution.benchmark.TPCDSQueryBenchmark$.runTpcdsQueries(TPCDSQueryBenchmark.scala:91)
at
org.apache.spark.sql.execution.benchmark.TPCDSQueryBenchmark$.runBenchmarkSuite(TPCDSQueryBenchmark.scala:185)
at
org.apache.spark.benchmark.BenchmarkBase.main(BenchmarkBase.scala:72)
at
org.apache.spark.sql.execution.benchmark.TPCDSQueryBenchmark.main(TPCDSQueryBenchmark.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.spark.benchmark.Benchmarks$.$anonfun$main$7(Benchmarks.scala:128)
at scala.collection.ArrayOps$.foreach$extension(ArrayOps.scala:1328)
at org.apache.spark.benchmark.Benchmarks$.main(Benchmarks.scala:91)
at org.apache.spark.benchmark.Benchmarks.main(Benchmarks.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1025)
at
org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:192)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:215)
at
org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91)
at
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1116)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1125)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
```
`KryoSerializer` is unable to optimize Scala 2.13 Spark because of missing
register `immutable.ArraySeq$ofRef`, so this pr has added its registration for
Scala 2.13.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
- Add new case to check serde of
`scala.collection.immutable.ArraySeq$ofRef` for Scala 2.13.
- Check `TPCDSQueryBenchmark` on Github Action:
**Before**
https://github.com/LuciferYang/spark/actions/runs/5129288422/jobs/9226895410
<img width="1290" alt="image"
src="https://github.com/apache/spark/assets/1475305/4effdc9e-153c-4126-bbaf-3206ab98579e">
**After**
https://github.com/LuciferYang/spark/actions/runs/5132285127
<img width="1225" alt="image"
src="https://github.com/apache/spark/assets/1475305/89c4e2ac-848b-4acb-9d01-1bb4b86d09a1">
Closes #41402 from LuciferYang/SPARK-43898.
Authored-by: yangjie01 <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../scala/org/apache/spark/serializer/KryoSerializer.scala | 4 ++++
.../org/apache/spark/serializer/KryoSerializerSuite.scala | 11 +++++++++++
2 files changed, 15 insertions(+)
diff --git
a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index 2bbc15c490c..826d6789f88 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -26,6 +26,7 @@ import javax.annotation.Nullable
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag
+import scala.util.Properties
import scala.util.control.NonFatal
import com.esotericsoftware.kryo.{Kryo, KryoException, Serializer =>
KryoClassSerializer}
@@ -219,6 +220,9 @@ class KryoSerializer(conf: SparkConf)
kryo.register(None.getClass)
kryo.register(Nil.getClass)
+ if (Properties.versionNumberString.startsWith("2.13")) {
+
kryo.register(Utils.classForName("scala.collection.immutable.ArraySeq$ofRef"))
+ }
kryo.register(Utils.classForName("scala.collection.immutable.$colon$colon"))
kryo.register(Utils.classForName("scala.collection.immutable.Map$EmptyMap$"))
kryo.register(Utils.classForName("scala.math.Ordering$Reverse"))
diff --git
a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
index 1d4f61cfde9..9c941979e4e 100644
--- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
@@ -551,6 +551,17 @@ class KryoSerializerSuite extends SparkFunSuite with
SharedSparkContext {
val set = new OpenHashMap[Double, Double](10)
ser.serialize(set)
}
+
+ test("SPARK-43898: Register scala.collection.immutable.ArraySeq$ofRef for
Scala 2.13") {
+ assume(scala.util.Properties.versionNumberString.startsWith("2.13"))
+ val conf = new SparkConf(false)
+ conf.set(KRYO_REGISTRATION_REQUIRED, true)
+ val ser = new KryoSerializer(conf).newInstance()
+ def check[T: ClassTag](t: T): Unit = {
+ assert(ser.deserialize[T](ser.serialize(t)) === t)
+ }
+ check(Utils.classForName("scala.collection.immutable.ArraySeq$ofRef"))
+ }
}
class KryoSerializerAutoResetDisabledSuite extends SparkFunSuite with
SharedSparkContext {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]