This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 5df1d79f224 [SPARK-43744][CONNECT][FOLLOW-UP] Throw error from the 
constructor
5df1d79f224 is described below

commit 5df1d79f22420734d3677b8aef03c46594b428e2
Author: Zhen Li <[email protected]>
AuthorDate: Sun Jul 30 10:58:29 2023 +0900

    [SPARK-43744][CONNECT][FOLLOW-UP] Throw error from the constructor
    
    ### What changes were proposed in this pull request?
    Made the stub constructor to throw ClassNotFoundException if called.
    A tiny improvement to not recreate class loaders in executor if stubbing is 
not enabled.
    
    ### Why are the changes needed?
    Enhancement to https://github.com/apache/spark/pull/42069
    Should be merged to 3.5.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Unit tests.
    
    Closes #42222 from zhenlineo/error-from-constuctor.
    
    Authored-by: Zhen Li <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 .../artifact/SparkConnectArtifactManager.scala     |  6 +++---
 .../connect/artifact/StubClassLoaderSuite.scala    | 15 ++++++++++++++
 .../scala/org/apache/spark/executor/Executor.scala | 23 +++++++++++++---------
 .../org/apache/spark/internal/config/package.scala |  4 ++--
 .../org/apache/spark/util/StubClassLoader.scala    | 18 +++++++++++++++--
 5 files changed, 50 insertions(+), 16 deletions(-)

diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/artifact/SparkConnectArtifactManager.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/artifact/SparkConnectArtifactManager.scala
index 03391cef68b..c1dd7820c55 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/artifact/SparkConnectArtifactManager.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/artifact/SparkConnectArtifactManager.scala
@@ -31,7 +31,7 @@ import org.apache.hadoop.fs.{LocalFileSystem, Path => FSPath}
 
 import org.apache.spark.{JobArtifactSet, JobArtifactState, SparkContext, 
SparkEnv}
 import org.apache.spark.internal.Logging
-import org.apache.spark.internal.config.CONNECT_SCALA_UDF_STUB_CLASSES
+import org.apache.spark.internal.config.CONNECT_SCALA_UDF_STUB_PREFIXES
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.connect.artifact.util.ArtifactUtils
 import 
org.apache.spark.sql.connect.config.Connect.CONNECT_COPY_FROM_LOCAL_TO_FS_ALLOW_DEST_LOCAL
@@ -162,9 +162,9 @@ class SparkConnectArtifactManager(sessionHolder: 
SessionHolder) extends Logging
    */
   def classloader: ClassLoader = {
     val urls = getSparkConnectAddedJars :+ classDir.toUri.toURL
-    val loader = if 
(SparkEnv.get.conf.get(CONNECT_SCALA_UDF_STUB_CLASSES).nonEmpty) {
+    val loader = if 
(SparkEnv.get.conf.get(CONNECT_SCALA_UDF_STUB_PREFIXES).nonEmpty) {
       val stubClassLoader =
-        StubClassLoader(null, 
SparkEnv.get.conf.get(CONNECT_SCALA_UDF_STUB_CLASSES))
+        StubClassLoader(null, 
SparkEnv.get.conf.get(CONNECT_SCALA_UDF_STUB_PREFIXES))
       new ChildFirstURLClassLoader(
         urls.toArray,
         stubClassLoader,
diff --git 
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/artifact/StubClassLoaderSuite.scala
 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/artifact/StubClassLoaderSuite.scala
index 0f6e0543151..bde9a71fa17 100644
--- 
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/artifact/StubClassLoaderSuite.scala
+++ 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/artifact/StubClassLoaderSuite.scala
@@ -55,6 +55,21 @@ class StubClassLoaderSuite extends SparkFunSuite {
     }
   }
 
+  test("call stub class default constructor") {
+    val cl = new RecordedStubClassLoader(getClass().getClassLoader(), _ => 
true)
+    // scalastyle:off classforname
+    val cls = Class.forName("my.name.HelloWorld", false, cl)
+    // scalastyle:on classforname
+    assert(cl.lastStubbed === "my.name.HelloWorld")
+    val error = intercept[java.lang.reflect.InvocationTargetException] {
+      cls.getDeclaredConstructor().newInstance()
+    }
+    assert(
+      error.getCause != null && error.getCause.getMessage.contains(
+        "Fail to initiate the class my.name.HelloWorld because it is stubbed"),
+      error)
+  }
+
   test("stub missing class") {
     val sysClassLoader = getClass.getClassLoader()
     val stubClassLoader = new RecordedStubClassLoader(null, _ => true)
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala 
b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 9327ea4d3dd..1b7bb8af79a 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -174,7 +174,8 @@ private[spark] class Executor(
     val currentFiles = new HashMap[String, Long]
     val currentJars = new HashMap[String, Long]
     val currentArchives = new HashMap[String, Long]
-    val urlClassLoader = createClassLoader(currentJars, 
!isDefaultState(jobArtifactState.uuid))
+    val urlClassLoader =
+      createClassLoader(currentJars, 
isStubbingEnabledForState(jobArtifactState.uuid))
     val replClassLoader = addReplClassLoaderIfNeeded(
       urlClassLoader, jobArtifactState.replClassDirUri, jobArtifactState.uuid)
     new IsolatedSessionState(
@@ -186,6 +187,11 @@ private[spark] class Executor(
     )
   }
 
+  private def isStubbingEnabledForState(name: String) = {
+    !isDefaultState(name) &&
+      conf.get(CONNECT_SCALA_UDF_STUB_PREFIXES).nonEmpty
+  }
+
   private def isDefaultState(name: String) = name == "default"
 
   // Classloader isolation
@@ -1031,8 +1037,8 @@ private[spark] class Executor(
       urls.mkString("'", ",", "'")
     )
 
-    if (useStub && conf.get(CONNECT_SCALA_UDF_STUB_CLASSES).nonEmpty) {
-      createClassLoaderWithStub(urls, conf.get(CONNECT_SCALA_UDF_STUB_CLASSES))
+    if (useStub) {
+      createClassLoaderWithStub(urls, 
conf.get(CONNECT_SCALA_UDF_STUB_PREFIXES))
     } else {
       createClassLoader(urls)
     }
@@ -1093,7 +1099,7 @@ private[spark] class Executor(
       state: IsolatedSessionState,
       testStartLatch: Option[CountDownLatch] = None,
       testEndLatch: Option[CountDownLatch] = None): Unit = {
-    var updated = false;
+    var renewClassLoader = false;
     lazy val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
     updateDependenciesLock.lockInterruptibly()
     try {
@@ -1149,15 +1155,14 @@ private[spark] class Executor(
           if (!state.urlClassLoader.getURLs().contains(url)) {
             logInfo(s"Adding $url to class loader ${state.sessionUUID}")
             state.urlClassLoader.addURL(url)
-            if (!isDefaultState(state.sessionUUID)) {
-              updated = true
+            if (isStubbingEnabledForState(state.sessionUUID)) {
+              renewClassLoader = true
             }
           }
         }
       }
-      if (updated) {
-        // When a new url is added for non-default class loader, recreate the 
class loader
-        // to ensure all classes are updated.
+      if (renewClassLoader) {
+        // Recreate the class loader to ensure all classes are updated.
         state.urlClassLoader = createClassLoader(state.urlClassLoader.getURLs, 
useStub = true)
         state.replClassLoader =
           addReplClassLoaderIfNeeded(state.urlClassLoader, 
state.replClassDirUri, state.sessionUUID)
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index ba809b7a3b1..81226706d67 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -2556,8 +2556,8 @@ package object config {
       .booleanConf
       .createWithDefault(false)
 
-  private[spark] val CONNECT_SCALA_UDF_STUB_CLASSES =
-    ConfigBuilder("spark.connect.scalaUdf.stubClasses")
+  private[spark] val CONNECT_SCALA_UDF_STUB_PREFIXES =
+    ConfigBuilder("spark.connect.scalaUdf.stubPrefixes")
       .internal()
       .doc("""
           |Comma-separated list of binary names of classes/packages that 
should be stubbed during
diff --git a/core/src/main/scala/org/apache/spark/util/StubClassLoader.scala 
b/core/src/main/scala/org/apache/spark/util/StubClassLoader.scala
index a0bc753f488..e27376e2b83 100644
--- a/core/src/main/scala/org/apache/spark/util/StubClassLoader.scala
+++ b/core/src/main/scala/org/apache/spark/util/StubClassLoader.scala
@@ -70,8 +70,22 @@ object StubClassLoader {
       "()V",
       false)
 
-    ctorWriter.visitInsn(Opcodes.RETURN)
-    ctorWriter.visitMaxs(1, 1)
+    val internalException: String = "java/lang/ClassNotFoundException"
+    ctorWriter.visitTypeInsn(Opcodes.NEW, internalException)
+    ctorWriter.visitInsn(Opcodes.DUP)
+    ctorWriter.visitLdcInsn(
+      s"Fail to initiate the class $binaryName because it is stubbed. " +
+        "Please install the artifact of the missing class by calling 
session.addArtifact.")
+    // Invoke throwable constructor
+    ctorWriter.visitMethodInsn(
+      Opcodes.INVOKESPECIAL,
+      internalException,
+      "<init>",
+      "(Ljava/lang/String;)V",
+      false)
+
+    ctorWriter.visitInsn(Opcodes.ATHROW)
+    ctorWriter.visitMaxs(3, 3)
     ctorWriter.visitEnd()
     classWriter.visitEnd()
     classWriter.toByteArray


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to