This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 398435fcd6e [SPARK-43898][CORE] Automatically register 
`immutable.ArraySeq$ofRef` to `KryoSerializer` for Scala 2.13
398435fcd6e is described below

commit 398435fcd6eb8623219c8d6c3e5966463ebe4429
Author: yangjie01 <[email protected]>
AuthorDate: Wed May 31 19:23:30 2023 -0700

    [SPARK-43898][CORE] Automatically register `immutable.ArraySeq$ofRef` to 
`KryoSerializer` for Scala 2.13
    
    SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt clean "sql/Test/runMain 
org.apache.spark.sql.execution.benchmark.TPCDSQueryBenchmark --data-location 
/Users/yangjie01/Tools/tpcds-sf-1 --query-filter q17" -Pscala-2.13
    
    ### What changes were proposed in this pull request?
    This pr aims to automatically register `immutable.ArraySeq$ofRef` to 
`KryoSerializer` for Scala 2.13 to make `TPCDSQueryBenchmark` can run 
successfully using Scala 2.13.
    
    ### Why are the changes needed?
    Scala 2.13 introduced `scala.collection.immutable.ArraySeq$ofRef`, but it 
has not been registered with `KryoSerializer`, so when run 
`TPCDSQueryBenchmark` using `KryoSerializer` and  enabled 
`spark.kryo.registrationRequired`(This is the default behavior after 
SPARK-42074), there will be the following error:
    
    ```
    Error: Exception in thread "main" org.apache.spark.SparkException: Job 
aborted due to stage failure: Failed to serialize task 741, not attempting to 
retry it. Exception during serialization: java.io.IOException: 
java.lang.IllegalArgumentException: Class is not registered: 
scala.collection.immutable.ArraySeq$ofRef
    Note: To register this class use: 
kryo.register(scala.collection.immutable.ArraySeq$ofRef.class);
            at 
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2815)
            at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2751)
            at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2750)
            at scala.collection.immutable.List.foreach(List.scala:333)
            at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2750)
            at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1218)
            at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1218)
            at scala.Option.foreach(Option.scala:437)
            at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1218)
            at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3014)
            at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2953)
            at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2942)
            at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
            at 
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:983)
            at org.apache.spark.SparkContext.runJob(SparkContext.scala:2285)
            at 
org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:385)
            at 
org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:359)
            at 
org.apache.spark.sql.execution.datasources.v2.OverwriteByExpressionExec.writeWithV2(WriteToDataSourceV2Exec.scala:243)
            at 
org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run(WriteToDataSourceV2Exec.scala:337)
            at 
org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run$(WriteToDataSourceV2Exec.scala:336)
            at 
org.apache.spark.sql.execution.datasources.v2.OverwriteByExpressionExec.run(WriteToDataSourceV2Exec.scala:243)
            at 
org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
            at 
org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)
            at 
org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)
            at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
            at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
            at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
            at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
            at 
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:825)
            at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
            at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
            at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
            at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512)
            at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
            at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512)
            at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
            at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
            at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
            at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
            at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
            at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)
            at 
org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
            at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
            at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
            at 
org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:133)
            at 
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:858)
            at 
org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:318)
            at 
org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:247)
            at 
org.apache.spark.sql.execution.benchmark.SqlBasedBenchmark$DatasetToBenchmark.noop(SqlBasedBenchmark.scala:70)
            at 
org.apache.spark.sql.execution.benchmark.TPCDSQueryBenchmark$.$anonfun$runTpcdsQueries$5(TPCDSQueryBenchmark.scala:111)
            at 
org.apache.spark.benchmark.Benchmark.$anonfun$addCase$1(Benchmark.scala:77)
            at 
org.apache.spark.benchmark.Benchmark.$anonfun$addCase$1$adapted(Benchmark.scala:75)
            at org.apache.spark.benchmark.Benchmark.measure(Benchmark.scala:140)
            at 
org.apache.spark.benchmark.Benchmark.$anonfun$run$1(Benchmark.scala:106)
            at 
scala.collection.StrictOptimizedIterableOps.map(StrictOptimizedIterableOps.scala:100)
            at 
scala.collection.StrictOptimizedIterableOps.map$(StrictOptimizedIterableOps.scala:87)
            at scala.collection.mutable.ArrayBuffer.map(ArrayBuffer.scala:43)
            at org.apache.spark.benchmark.Benchmark.run(Benchmark.scala:104)
            at 
org.apache.spark.sql.execution.benchmark.TPCDSQueryBenchmark$.$anonfun$runTpcdsQueries$1(TPCDSQueryBenchmark.scala:113)
            at 
org.apache.spark.sql.execution.benchmark.TPCDSQueryBenchmark$.$anonfun$runTpcdsQueries$1$adapted(TPCDSQueryBenchmark.scala:91)
            at scala.collection.immutable.List.foreach(List.scala:333)
            at 
org.apache.spark.sql.execution.benchmark.TPCDSQueryBenchmark$.runTpcdsQueries(TPCDSQueryBenchmark.scala:91)
            at 
org.apache.spark.sql.execution.benchmark.TPCDSQueryBenchmark$.runBenchmarkSuite(TPCDSQueryBenchmark.scala:185)
            at 
org.apache.spark.benchmark.BenchmarkBase.main(BenchmarkBase.scala:72)
            at 
org.apache.spark.sql.execution.benchmark.TPCDSQueryBenchmark.main(TPCDSQueryBenchmark.scala)
            at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
            at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
            at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
            at java.lang.reflect.Method.invoke(Method.java:498)
            at 
org.apache.spark.benchmark.Benchmarks$.$anonfun$main$7(Benchmarks.scala:128)
            at scala.collection.ArrayOps$.foreach$extension(ArrayOps.scala:1328)
            at org.apache.spark.benchmark.Benchmarks$.main(Benchmarks.scala:91)
            at org.apache.spark.benchmark.Benchmarks.main(Benchmarks.scala)
            at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
            at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
            at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
            at java.lang.reflect.Method.invoke(Method.java:498)
            at 
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
            at 
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1025)
            at 
org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:192)
            at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:215)
            at 
org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91)
            at 
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1116)
            at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1125)
            at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
    ```
    
    `KryoSerializer` is unable to optimize Scala 2.13 Spark because of missing 
register `immutable.ArraySeq$ofRef`, so this pr has added its registration for 
Scala 2.13.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    - Add new case to check serde of 
`scala.collection.immutable.ArraySeq$ofRef` for Scala 2.13.
    - Check `TPCDSQueryBenchmark` on Github Action:
    
    **Before**
    
    https://github.com/LuciferYang/spark/actions/runs/5129288422/jobs/9226895410
    
    <img width="1290" alt="image" 
src="https://github.com/apache/spark/assets/1475305/4effdc9e-153c-4126-bbaf-3206ab98579e";>
    
    **After**
    
    https://github.com/LuciferYang/spark/actions/runs/5132285127
    
    <img width="1225" alt="image" 
src="https://github.com/apache/spark/assets/1475305/89c4e2ac-848b-4acb-9d01-1bb4b86d09a1";>
    
    Closes #41402 from LuciferYang/SPARK-43898.
    
    Authored-by: yangjie01 <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../scala/org/apache/spark/serializer/KryoSerializer.scala    |  4 ++++
 .../org/apache/spark/serializer/KryoSerializerSuite.scala     | 11 +++++++++++
 2 files changed, 15 insertions(+)

diff --git 
a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala 
b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index 2bbc15c490c..826d6789f88 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -26,6 +26,7 @@ import javax.annotation.Nullable
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 import scala.reflect.ClassTag
+import scala.util.Properties
 import scala.util.control.NonFatal
 
 import com.esotericsoftware.kryo.{Kryo, KryoException, Serializer => 
KryoClassSerializer}
@@ -219,6 +220,9 @@ class KryoSerializer(conf: SparkConf)
 
     kryo.register(None.getClass)
     kryo.register(Nil.getClass)
+    if (Properties.versionNumberString.startsWith("2.13")) {
+      
kryo.register(Utils.classForName("scala.collection.immutable.ArraySeq$ofRef"))
+    }
     
kryo.register(Utils.classForName("scala.collection.immutable.$colon$colon"))
     
kryo.register(Utils.classForName("scala.collection.immutable.Map$EmptyMap$"))
     kryo.register(Utils.classForName("scala.math.Ordering$Reverse"))
diff --git 
a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala 
b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
index 1d4f61cfde9..9c941979e4e 100644
--- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
@@ -551,6 +551,17 @@ class KryoSerializerSuite extends SparkFunSuite with 
SharedSparkContext {
     val set = new OpenHashMap[Double, Double](10)
     ser.serialize(set)
   }
+
+  test("SPARK-43898: Register scala.collection.immutable.ArraySeq$ofRef for 
Scala 2.13") {
+    assume(scala.util.Properties.versionNumberString.startsWith("2.13"))
+    val conf = new SparkConf(false)
+    conf.set(KRYO_REGISTRATION_REQUIRED, true)
+    val ser = new KryoSerializer(conf).newInstance()
+    def check[T: ClassTag](t: T): Unit = {
+      assert(ser.deserialize[T](ser.serialize(t)) === t)
+    }
+    check(Utils.classForName("scala.collection.immutable.ArraySeq$ofRef"))
+  }
 }
 
 class KryoSerializerAutoResetDisabledSuite extends SparkFunSuite with 
SharedSparkContext {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to