This is an automated email from the ASF dual-hosted git repository. yangjie01 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 25053d98186 [SPARK-44689][CONNECT] Make the exception handling of function `SparkConnectPlanner#unpackScalarScalaUDF` more universal 25053d98186 is described below commit 25053d98186489d9f2061c9b815a5a33f7e309c4 Author: yangjie01 <yangji...@baidu.com> AuthorDate: Tue Aug 8 11:06:21 2023 +0800 [SPARK-44689][CONNECT] Make the exception handling of function `SparkConnectPlanner#unpackScalarScalaUDF` more universal ### What changes were proposed in this pull request? This PR changes the exception handling in the `unpackScalarScalaUD` function in `SparkConnectPlanner` from determining the exception type based on a fixed nesting level to using Guava `Throwables` to get the root cause and then determining the type of the root cause. This makes it compatible with differences between different Java versions. ### Why are the changes needed? The following failure occurred when testing `UDFClassLoadingE2ESuite` in Java 17 daily test: https://github.com/apache/spark/actions/runs/5766913899/job/15635782831 ``` [info] UDFClassLoadingE2ESuite: [info] - update class loader after stubbing: new session *** FAILED *** (101 milliseconds) [info] "Exception in SerializedLambda.readResolve" did not contain "java.lang.NoSuchMethodException: org.apache.spark.sql.connect.client.StubClassDummyUdf" (UDFClassLoadingE2ESuite.scala:57) ... [info] - update class loader after stubbing: same session *** FAILED *** (52 milliseconds) [info] "Exception in SerializedLambda.readResolve" did not contain "java.lang.NoSuchMethodException: org.apache.spark.sql.connect.client.StubClassDummyUdf" (UDFClassLoadingE2ESuite.scala:73) ... ``` After analysis, it was found that there are differences in the exception stack generated on the server side between Java 8 and Java 17: - Java 8 ``` java.io.IOException: unexpected exception type at java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1750) at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1280) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2222) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2431) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2355) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461) at org.apache.spark.util.SparkSerDeUtils.deserialize(SparkSerDeUtils.scala:50) at org.apache.spark.util.SparkSerDeUtils.deserialize$(SparkSerDeUtils.scala:41) at org.apache.spark.util.Utils$.deserialize(Utils.scala:95) at org.apache.spark.sql.connect.planner.SparkConnectPlanner.unpackScalarScalaUDF(SparkConnectPlanner.scala:1516) at org.apache.spark.sql.connect.planner.SparkConnectPlanner.org$apache$spark$sql$connect$planner$SparkConnectPlanner$$unpackUdf(SparkConnectPlanner.scala:1507) at org.apache.spark.sql.connect.planner.SparkConnectPlanner.transformScalarScalaFunction(SparkConnectPlanner.scala:1544) at org.apache.spark.sql.connect.planner.SparkConnectPlanner.handleRegisterScalarScalaUDF(SparkConnectPlanner.scala:2565) at org.apache.spark.sql.connect.planner.SparkConnectPlanner.handleRegisterUserDefinedFunction(SparkConnectPlanner.scala:2492) at org.apache.spark.sql.connect.planner.SparkConnectPlanner.process(SparkConnectPlanner.scala:2363) at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.handleCommand(ExecuteThreadRunner.scala:202) at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1(ExecuteThreadRunner.scala:158) at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1$adapted(ExecuteThreadRunner.scala:132) at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$2(SessionHolder.scala:184) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900) at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$1(SessionHolder.scala:184) at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94) at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withContextClassLoader$1(SessionHolder.scala:171) at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:179) at org.apache.spark.sql.connect.service.SessionHolder.withContextClassLoader(SessionHolder.scala:170) at org.apache.spark.sql.connect.service.SessionHolder.withSession(SessionHolder.scala:183) at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.executeInternal(ExecuteThreadRunner.scala:132) at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.org$apache$spark$sql$connect$execution$ExecuteThreadRunner$$execute(ExecuteThreadRunner.scala:84) at org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.run(ExecuteThreadRunner.scala:227) Caused by: java.lang.NoSuchMethodException: org.apache.spark.sql.connect.client.StubClassDummyUdf.$deserializeLambda$(java.lang.invoke.SerializedLambda) at java.lang.Class.getDeclaredMethod(Class.java:2130) at java.lang.invoke.SerializedLambda$1.run(SerializedLambda.java:224) at java.lang.invoke.SerializedLambda$1.run(SerializedLambda.java:221) at java.security.AccessController.doPrivileged(Native Method) at java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:221) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1274) ... 31 more ``` - Java 17 ``` java.lang.RuntimeException: Exception in SerializedLambda.readResolve at java.base/java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:288) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:568) at java.base/java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1190) at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2266) at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733) at java.base/java.io.ObjectInputStream$FieldValues.<init>(ObjectInputStream.java:2606) at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2457) at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2257) at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733) at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:509) at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:467) at org.apache.spark.util.SparkSerDeUtils.deserialize(SparkSerDeUtils.scala:50) at org.apache.spark.util.SparkSerDeUtils.deserialize$(SparkSerDeUtils.scala:41) at org.apache.spark.util.Utils$.deserialize(Utils.scala:95) at org.apache.spark.sql.connect.planner.SparkConnectPlanner.unpackScalarScalaUDF(SparkConnectPlanner.scala:1517) at org.apache.spark.sql.connect.planner.SparkConnectPlanner.org$apache$spark$sql$connect$planner$SparkConnectPlanner$$unpackUdf(SparkConnectPlanner.scala:1507) at org.apache.spark.sql.connect.planner.SparkConnectPlanner.transformScalarScalaFunction(SparkConnectPlanner.scala:1552) at org.apache.spark.sql.connect.planner.SparkConnectPlanner.handleRegisterScalarScalaUDF(SparkConnectPlanner.scala:2573) at org.apache.spark.sql.connect.planner.SparkConnectPlanner.handleRegisterUserDefinedFunction(SparkConnectPlanner.scala:2500) at org.apache.spark.sql.connect.planner.SparkConnectPlanner.process(SparkConnectPlanner.scala:2371) at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.handleCommand(ExecuteThreadRunner.scala:202) at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1(ExecuteThreadRunner.scala:158) at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1$adapted(ExecuteThreadRunner.scala:132) at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$2(SessionHolder.scala:184) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900) at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$1(SessionHolder.scala:184) at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94) at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withContextClassLoader$1(SessionHolder.scala:171) at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:179) at org.apache.spark.sql.connect.service.SessionHolder.withContextClassLoader(SessionHolder.scala:170) at org.apache.spark.sql.connect.service.SessionHolder.withSession(SessionHolder.scala:183) at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.executeInternal(ExecuteThreadRunner.scala:132) at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.org$apache$spark$sql$connect$execution$ExecuteThreadRunner$$execute(ExecuteThreadRunner.scala:84) at org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.run(ExecuteThreadRunner.scala:227) Caused by: java.security.PrivilegedActionException: java.lang.NoSuchMethodException: org.apache.spark.sql.connect.client.StubClassDummyUdf.$deserializeLambda$(java.lang.invoke.SerializedLambda) at java.base/java.security.AccessController.doPrivileged(AccessController.java:573) at java.base/java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:269) ... 36 more Caused by: java.lang.NoSuchMethodException: org.apache.spark.sql.connect.client.StubClassDummyUdf.$deserializeLambda$(java.lang.invoke.SerializedLambda) at java.base/java.lang.Class.getDeclaredMethod(Class.java:2675) at java.base/java.lang.invoke.SerializedLambda$1.run(SerializedLambda.java:272) at java.base/java.lang.invoke.SerializedLambda$1.run(SerializedLambda.java:269) at java.base/java.security.AccessController.doPrivileged(AccessController.java:569) ... 37 more ``` While their root exceptions are both `NoSuchMethodException`, the levels of nesting are different. We can add an exception check branch to make it compatible with Java 17, for example: ```scala case e: IOException if e.getCause.isInstanceOf[NoSuchMethodException] => throw new ClassNotFoundException(... ${e.getCause} ...) case e: RuntimeException if e.getCause != null && e.getCause.getCause.isInstanceOf[NoSuchMethodException] => throw new ClassNotFoundException(... ${e.getCause.getCause} ...) ``` But if future Java versions change the nested levels of exceptions again, this will necessitate another modification of this part of the code. Therefore, this PR has been revised to fetch the root cause of the exception and conduct a type check on the root cause to make it as universal as possible. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? - Pass Git Hub Actions - Manually check with Java 17 ``` java -version openjdk version "17.0.8" 2023-07-18 LTS OpenJDK Runtime Environment Zulu17.44+15-CA (build 17.0.8+7-LTS) OpenJDK 64-Bit Server VM Zulu17.44+15-CA (build 17.0.8+7-LTS, mixed mode, sharing) ``` run ``` build/sbt clean "connect-client-jvm/testOnly *UDFClassLoadingE2ESuite" -Phive ``` Before ``` [info] UDFClassLoadingE2ESuite: [info] - update class loader after stubbing: new session *** FAILED *** (60 milliseconds) [info] "Exception in SerializedLambda.readResolve" did not contain "java.lang.NoSuchMethodException: org.apache.spark.sql.connect.client.StubClassDummyUdf" (UDFClassLoadingE2ESuite.scala:57) ... [info] - update class loader after stubbing: same session *** FAILED *** (15 milliseconds) [info] "Exception in SerializedLambda.readResolve" did not contain "java.lang.NoSuchMethodException: org.apache.spark.sql.connect.client.StubClassDummyUdf" (UDFClassLoadingE2ESuite.scala:73) ... [info] Run completed in 9 seconds, 565 milliseconds. [info] Total number of tests run: 2 [info] Suites: completed 1, aborted 0 [info] Tests: succeeded 0, failed 2, canceled 0, ignored 0, pending 0 [info] *** 2 TESTS FAILED *** [error] Failed tests: [error] org.apache.spark.sql.connect.client.UDFClassLoadingE2ESuite [error] (connect-client-jvm / Test / testOnly) sbt.TestsFailedException: Tests unsuccessful ``` After ``` [info] UDFClassLoadingE2ESuite: [info] - update class loader after stubbing: new session (116 milliseconds) [info] - update class loader after stubbing: same session (41 milliseconds) [info] Run completed in 9 seconds, 781 milliseconds. [info] Total number of tests run: 2 [info] Suites: completed 1, aborted 0 [info] Tests: succeeded 2, failed 0, canceled 0, ignored 0, pending 0 [info] All tests passed. ``` Closes #42360 from LuciferYang/unpackScalarScalaUDF-exception-java17. Authored-by: yangjie01 <yangji...@baidu.com> Signed-off-by: yangjie01 <yangji...@baidu.com> --- .../spark/sql/connect/planner/SparkConnectPlanner.scala | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index 7136476b515..f70a17e580a 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -17,12 +17,11 @@ package org.apache.spark.sql.connect.planner -import java.io.IOException - import scala.collection.JavaConverters._ import scala.collection.mutable import scala.util.Try +import com.google.common.base.Throwables import com.google.common.collect.{Lists, Maps} import com.google.protobuf.{Any => ProtoAny, ByteString} import io.grpc.{Context, Status, StatusRuntimeException} @@ -1518,11 +1517,15 @@ class SparkConnectPlanner(val sessionHolder: SessionHolder) extends Logging { logDebug(s"Unpack using class loader: ${Utils.getContextOrSparkClassLoader}") Utils.deserialize[T](fun.getPayload.toByteArray, Utils.getContextOrSparkClassLoader) } catch { - case e: IOException if e.getCause.isInstanceOf[NoSuchMethodException] => - throw new ClassNotFoundException( - s"Failed to load class correctly due to ${e.getCause}. " + - "Make sure the artifact where the class is defined is installed by calling" + - " session.addArtifact.") + case t: Throwable => + Throwables.getRootCause(t) match { + case nsm: NoSuchMethodException => + throw new ClassNotFoundException( + s"Failed to load class correctly due to $nsm. " + + "Make sure the artifact where the class is defined is installed by calling" + + " session.addArtifact.") + case _ => throw t + } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org