This is an automated email from the ASF dual-hosted git repository.
attilapiros pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push:
new 89d9c3cd5eda [SPARK-56448][CONNECT] Fix NPE on Spark Connect client
restart due to ammonite compile cache
89d9c3cd5eda is described below
commit 89d9c3cd5eda56d3e7fa3112c0fef3f9d4bce6fc
Author: Anupam Yadav <[email protected]>
AuthorDate: Sat May 16 09:40:05 2026 -0700
[SPARK-56448][CONNECT] Fix NPE on Spark Connect client restart due to
ammonite compile cache
The Spark Connect REPL uses Ammonite. Ammonite's default `Storage.Folder`
persists compiled predef classes under `~/.ammonite/<version>/cache`. On a
subsequent REPL start from the same working directory, the cached
`CodePredef` class is reloaded but its reference to the per-session
`ArgsPredef` helper is stale, producing a `NullPointerException` during
predef initialization.
This PR switches the Connect REPL's compile cache to `Storage.InMemory`
so every session starts fresh and no stale cache is carried across
restarts.
The stale-cache failure is a user-visible crash on every every subsequent
call
of `bin/spark-shell --remote sc://...` from the same working
directory. Reproduction steps are on the JIRA.
There is one minor observable tradeoff: because the compile cache is
now in-memory rather than persisted, each REPL start recompiles the
predef instead of reading the cached classfiles. This adds ~a few
hundred milliseconds to subsequent REPL startups but eliminates the
NPE. We believe this is the correct tradeoff — a small startup cost
is preferable to a hard failure.
Added `AmmoniteReplE2ESuite` with a test starting `bin/spark-shell --remote
sc://...`
twice and checking both run was successful.
I verified the negative case locally by temporarily reverting only the
`Storage.InMemory()`
line and re-running the test; it fails with:
```
- SPARK-56448: restarting spark-shell --remote does not throw NPE ***
FAILED ***
1 did not equal 0 Second spark-shell failed (exit=1): WARNING: Using
incubator modules: jdk.incubator.vector
Exception in thread "main" java.lang.NullPointerException: Cannot invoke
"ammonite.predef.ArgsPredef$Helper.spark()" because the return value of
"ammonite.predef.CodePredef.ArgsPredef()" is null
at ammonite.predef.CodePredef$Helper.<init>((console):7)
at ammonite.predef.CodePredef$.<clinit>((console):6)
at ammonite.predef.CodePredef.$main((console))
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:569)
at
ammonite.runtime.Evaluator$$anon$1.$anonfun$evalMain$1(Evaluator.scala:108)
at ammonite.util.Util$.withContextClassloader(Util.scala:21)
at ammonite.runtime.Evaluator$$anon$1.evalMain(Evaluator.scala:90)
at
ammonite.interp.Interpreter.$anonfun$processAllScriptBlocks$10(Interpreter.scala:594)
at ammonite.util.Res$Success.map(Res.scala:63)
at
ammonite.interp.Interpreter.$anonfun$processAllScriptBlocks$9(Interpreter.scala:594)
at scala.Option$WithFilter.map(Option.scala:242)
at ammonite.interp.Interpreter.loop$1(Interpreter.scala:574)
at
ammonite.interp.Interpreter.processAllScriptBlocks(Interpreter.scala:644)
at
ammonite.interp.Interpreter.$anonfun$processModule$6(Interpreter.scala:432)
at ammonite.util.Catching.flatMap(Res.scala:110)
at
ammonite.interp.Interpreter.$anonfun$processModule$5(Interpreter.scala:423)
...
```
Restoring the fix makes the test pass.
Yes
Closes #55720 from yadavay-amzn/fix/spark_SPARK-56448.
Authored-by: Anupam Yadav <[email protected]>
Signed-off-by: attilapiros <[email protected]>
(cherry picked from commit 3e83503adaa0f7d72424842d6613a4bd18d5a943)
Signed-off-by: attilapiros <[email protected]>
---
.../apache/spark/sql/application/ConnectRepl.scala | 2 +
.../sql/application/AmmoniteReplE2ESuite.scala | 71 ++++++++++++++++++++++
2 files changed, 73 insertions(+)
diff --git
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/application/ConnectRepl.scala
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/application/ConnectRepl.scala
index 0360a4057886..cbee764eee39 100644
---
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/application/ConnectRepl.scala
+++
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/application/ConnectRepl.scala
@@ -23,6 +23,7 @@ import scala.util.control.NonFatal
import ammonite.compiler.CodeClassWrapper
import ammonite.compiler.iface.CodeWrapper
+import ammonite.runtime.Storage
import ammonite.util.{Bind, Imports, Name, Util}
import org.apache.spark.annotation.DeveloperApi
@@ -95,6 +96,7 @@ object ConnectRepl {
val main = ammonite.Main(
welcomeBanner = Option(splash),
predefCode = predefCode,
+ storageBackend = new Storage.InMemory(),
replCodeWrapper = ExtendedCodeClassWrapper,
scriptCodeWrapper = ExtendedCodeClassWrapper,
inputStream = inputStream,
diff --git
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/application/AmmoniteReplE2ESuite.scala
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/application/AmmoniteReplE2ESuite.scala
new file mode 100644
index 000000000000..b2847c16ec6f
--- /dev/null
+++
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/application/AmmoniteReplE2ESuite.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.application
+
+import java.util.concurrent.TimeUnit
+
+import scala.collection.mutable.ArrayBuffer
+import scala.sys.process.BasicIO
+
+import org.apache.spark.sql.test.{ConnectFunSuite, RemoteSparkSession}
+
+class AmmoniteReplE2ESuite extends ConnectFunSuite with RemoteSparkSession {
+
+ private def runSparkShell(): (Int, String, String) = {
+ val sparkHome = sys.props.getOrElse(
+ "spark.test.home",
+ sys.env.getOrElse("SPARK_HOME", fail("spark.test.home or SPARK_HOME not
set")))
+ val command =
Seq(s"$sparkHome/connector/connect/bin/spark-connect-scala-client", "--remote",
+ s"sc://localhost:$serverPort")
+
+ val process = new ProcessBuilder(command: _*).start()
+ // Close stdin immediately so shell exits on EOF
+ process.getOutputStream.close()
+
+ val stdout = new ArrayBuffer[String]()
+ val stderr = new ArrayBuffer[String]()
+ val stdoutThread = new Thread() {
+ setDaemon(true)
+ override def run(): Unit = BasicIO.processFully(stdout +=
_)(process.getInputStream)
+ }
+ val stderrThread = new Thread() {
+ setDaemon(true)
+ override def run(): Unit = BasicIO.processFully(stderr +=
_)(process.getErrorStream)
+ }
+ stdoutThread.start()
+ stderrThread.start()
+
+ val exited = process.waitFor(120, TimeUnit.SECONDS)
+ if (!exited) {
+ process.destroyForcibly()
+ fail("spark-connect-scala-client did not exit within 120 seconds")
+ }
+ stdoutThread.join(10000)
+ stderrThread.join(10000)
+ (process.exitValue(), stdout.mkString("\n"), stderr.mkString("\n"))
+ }
+
+ test("SPARK-56448: restarting spark-connect-scala-client --remote does not
throw NPE") {
+ // First invocation
+ val (exit1, _, stderr1) = runSparkShell()
+ assert(exit1 == 0, s"First spark-connect-scala-client failed
(exit=$exit1): $stderr1")
+
+ // Second invocation -- without the fix, this would NPE from stale
Ammonite cache
+ val (exit2, _, stderr2) = runSparkShell()
+ assert(exit2 == 0, s"Second spark-connect-scala-client failed
(exit=$exit2): $stderr2")
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]