This is an automated email from the ASF dual-hosted git repository.

attilapiros pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new 89d9c3cd5eda [SPARK-56448][CONNECT] Fix NPE on Spark Connect client 
restart due to ammonite compile cache
89d9c3cd5eda is described below

commit 89d9c3cd5eda56d3e7fa3112c0fef3f9d4bce6fc
Author: Anupam Yadav <[email protected]>
AuthorDate: Sat May 16 09:40:05 2026 -0700

    [SPARK-56448][CONNECT] Fix NPE on Spark Connect client restart due to 
ammonite compile cache
    
    The Spark Connect REPL uses Ammonite. Ammonite's default `Storage.Folder`
    persists compiled predef classes under `~/.ammonite/<version>/cache`. On a
    subsequent REPL start from the same working directory, the cached
    `CodePredef` class is reloaded but its reference to the per-session
    `ArgsPredef` helper is stale, producing a `NullPointerException` during
    predef initialization.
    
    This PR switches the Connect REPL's compile cache to `Storage.InMemory`
    so every session starts fresh and no stale cache is carried across
    restarts.
    
    The stale-cache failure is a user-visible crash on every every subsequent 
call
    of `bin/spark-shell --remote sc://...` from the same working
    directory. Reproduction steps are on the JIRA.
    
    There is one minor observable tradeoff: because the compile cache is
    now in-memory rather than persisted, each REPL start recompiles the
    predef instead of reading the cached classfiles. This adds ~a few
    hundred milliseconds to subsequent REPL startups but eliminates the
    NPE. We believe this is the correct tradeoff — a small startup cost
    is preferable to a hard failure.
    
    Added `AmmoniteReplE2ESuite` with a test starting `bin/spark-shell --remote 
sc://...`
    twice  and checking both run was successful.
    
    I verified the negative case locally by temporarily reverting only the 
`Storage.InMemory()`
    line and re-running the test; it fails with:
    ```
    - SPARK-56448: restarting spark-shell --remote does not throw NPE *** 
FAILED ***
      1 did not equal 0 Second spark-shell failed (exit=1): WARNING: Using 
incubator modules: jdk.incubator.vector
      Exception in thread "main" java.lang.NullPointerException: Cannot invoke 
"ammonite.predef.ArgsPredef$Helper.spark()" because the return value of 
"ammonite.predef.CodePredef.ArgsPredef()" is null
            at ammonite.predef.CodePredef$Helper.<init>((console):7)
            at ammonite.predef.CodePredef$.<clinit>((console):6)
            at ammonite.predef.CodePredef.$main((console))
            at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
            at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
            at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
            at java.base/java.lang.reflect.Method.invoke(Method.java:569)
            at 
ammonite.runtime.Evaluator$$anon$1.$anonfun$evalMain$1(Evaluator.scala:108)
            at ammonite.util.Util$.withContextClassloader(Util.scala:21)
            at ammonite.runtime.Evaluator$$anon$1.evalMain(Evaluator.scala:90)
            at 
ammonite.interp.Interpreter.$anonfun$processAllScriptBlocks$10(Interpreter.scala:594)
            at ammonite.util.Res$Success.map(Res.scala:63)
            at 
ammonite.interp.Interpreter.$anonfun$processAllScriptBlocks$9(Interpreter.scala:594)
            at scala.Option$WithFilter.map(Option.scala:242)
            at ammonite.interp.Interpreter.loop$1(Interpreter.scala:574)
            at 
ammonite.interp.Interpreter.processAllScriptBlocks(Interpreter.scala:644)
            at 
ammonite.interp.Interpreter.$anonfun$processModule$6(Interpreter.scala:432)
            at ammonite.util.Catching.flatMap(Res.scala:110)
            at 
ammonite.interp.Interpreter.$anonfun$processModule$5(Interpreter.scala:423)
    ...
    ```
    
    Restoring the fix makes the test pass.
    
    Yes
    
    Closes #55720 from yadavay-amzn/fix/spark_SPARK-56448.
    
    Authored-by: Anupam Yadav <[email protected]>
    Signed-off-by: attilapiros <[email protected]>
    (cherry picked from commit 3e83503adaa0f7d72424842d6613a4bd18d5a943)
    Signed-off-by: attilapiros <[email protected]>
---
 .../apache/spark/sql/application/ConnectRepl.scala |  2 +
 .../sql/application/AmmoniteReplE2ESuite.scala     | 71 ++++++++++++++++++++++
 2 files changed, 73 insertions(+)

diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/application/ConnectRepl.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/application/ConnectRepl.scala
index 0360a4057886..cbee764eee39 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/application/ConnectRepl.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/application/ConnectRepl.scala
@@ -23,6 +23,7 @@ import scala.util.control.NonFatal
 
 import ammonite.compiler.CodeClassWrapper
 import ammonite.compiler.iface.CodeWrapper
+import ammonite.runtime.Storage
 import ammonite.util.{Bind, Imports, Name, Util}
 
 import org.apache.spark.annotation.DeveloperApi
@@ -95,6 +96,7 @@ object ConnectRepl {
     val main = ammonite.Main(
       welcomeBanner = Option(splash),
       predefCode = predefCode,
+      storageBackend = new Storage.InMemory(),
       replCodeWrapper = ExtendedCodeClassWrapper,
       scriptCodeWrapper = ExtendedCodeClassWrapper,
       inputStream = inputStream,
diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/application/AmmoniteReplE2ESuite.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/application/AmmoniteReplE2ESuite.scala
new file mode 100644
index 000000000000..b2847c16ec6f
--- /dev/null
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/application/AmmoniteReplE2ESuite.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.application
+
+import java.util.concurrent.TimeUnit
+
+import scala.collection.mutable.ArrayBuffer
+import scala.sys.process.BasicIO
+
+import org.apache.spark.sql.test.{ConnectFunSuite, RemoteSparkSession}
+
+class AmmoniteReplE2ESuite extends ConnectFunSuite with RemoteSparkSession {
+
+  private def runSparkShell(): (Int, String, String) = {
+    val sparkHome = sys.props.getOrElse(
+      "spark.test.home",
+      sys.env.getOrElse("SPARK_HOME", fail("spark.test.home or SPARK_HOME not 
set")))
+    val command = 
Seq(s"$sparkHome/connector/connect/bin/spark-connect-scala-client", "--remote",
+      s"sc://localhost:$serverPort")
+
+    val process = new ProcessBuilder(command: _*).start()
+    // Close stdin immediately so shell exits on EOF
+    process.getOutputStream.close()
+
+    val stdout = new ArrayBuffer[String]()
+    val stderr = new ArrayBuffer[String]()
+    val stdoutThread = new Thread() {
+      setDaemon(true)
+      override def run(): Unit = BasicIO.processFully(stdout += 
_)(process.getInputStream)
+    }
+    val stderrThread = new Thread() {
+      setDaemon(true)
+      override def run(): Unit = BasicIO.processFully(stderr += 
_)(process.getErrorStream)
+    }
+    stdoutThread.start()
+    stderrThread.start()
+
+    val exited = process.waitFor(120, TimeUnit.SECONDS)
+    if (!exited) {
+      process.destroyForcibly()
+      fail("spark-connect-scala-client did not exit within 120 seconds")
+    }
+    stdoutThread.join(10000)
+    stderrThread.join(10000)
+    (process.exitValue(), stdout.mkString("\n"), stderr.mkString("\n"))
+  }
+
+  test("SPARK-56448: restarting spark-connect-scala-client --remote does not 
throw NPE") {
+    // First invocation
+    val (exit1, _, stderr1) = runSparkShell()
+    assert(exit1 == 0, s"First spark-connect-scala-client failed 
(exit=$exit1): $stderr1")
+
+    // Second invocation -- without the fix, this would NPE from stale 
Ammonite cache
+    val (exit2, _, stderr2) = runSparkShell()
+    assert(exit2 == 0, s"Second spark-connect-scala-client failed 
(exit=$exit2): $stderr2")
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to