This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 18e07956e94 [SPARK-45739][PYTHON] Catch IOException instead of
EOFException alone for faulthandler
18e07956e94 is described below
commit 18e07956e9476bf3e9264eb878a25b838feff4a6
Author: Hyukjin Kwon <[email protected]>
AuthorDate: Wed Nov 1 07:33:35 2023 +0900
[SPARK-45739][PYTHON] Catch IOException instead of EOFException alone for
faulthandler
### What changes were proposed in this pull request?
This PR improves `spark.python.worker.faulthandler.enabled` feature by
catching `IOException` instead of `EOFException` (narrower).
### Why are the changes needed?
Exceptions such as `java.net.SocketException: Connection reset` can happen
because the worker unexpectedly die. We should better catch all IO exception
there.
### Does this PR introduce _any_ user-facing change?
Yes, but only in special cases. When the worker dies unexpectedly during
its initialization, this can happen.
### How was this patch tested?
I tested this with Spark Connect:
```bash
$ cat <<EOT >> malformed_daemon.py
import ctypes
from pyspark import daemon
from pyspark import TaskContext
def raise_segfault():
ctypes.string_at(0)
# Throw a segmentation fault during init.
TaskContext._getOrCreate = raise_segfault
if __name__ == '__main__':
daemon.manager()
EOT
```
```bash
./sbin/stop-connect-server.sh$ ./sbin/start-connect-server.sh --conf
spark.python.daemon.module=malformed_daemon --conf
spark.python.worker.faulthandler.enabled=true --jars `ls
connector/connect/server/target/**/spark-connect*SNAPSHOT.jar`
```
```bash
./bin/pyspark --remote "sc://localhost:15002"
```
```python
from pyspark.sql.functions import udf
spark.addArtifact("malformed_daemon.py", pyfile=True)
spark.range(1).select(udf(lambda x: x)("id")).collect()
```
**Before**
```
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/.../spark/python/pyspark/sql/connect/dataframe.py", line 1710, in
collect
table, schema = self._to_table()
...
File "/.../spark/python/pyspark/sql/connect/client/core.py", line 1575,
in _handle_rpc_error
raise convert_exception(
pyspark.errors.exceptions.connect.SparkConnectGrpcException:
(org.apache.spark.SparkException) Job aborted due to stage failure: Task 8 in
stage 0.0 failed 1 times, most recent failure: Lost task 8.0 in stage 0.0 (TID
8) (192.168.123.102 executor driver): java.net.SocketException: Connection reset
at
...
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
Driver stacktrace:
JVM stacktrace:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 8
in stage 0.0 failed 1 times, most recent failure: Lost task 8.0 in stage 0.0
(TID 8) (192.168.123.102 executor driver): java.net.SocketException: Connection
reset
at
java.base/sun.nio.ch.SocketChannelImpl.throwConnectionReset(SocketChannelImpl.java:394)
at
...
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.lang.Thread.run(Thread.java:833)
```
**After**
```
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/.../spark/python/pyspark/sql/connect/dataframe.py", line 1710, in
collect
table, schema = self._to_table()
...
"/.../spark/python/pyspark/sql/connect/client/core.py", line 1575, in
_handle_rpc_error
raise convert_exception(
pyspark.errors.exceptions.connect.SparkConnectGrpcException:
(org.apache.spark.SparkException) Job aborted due to stage failure: Task 4 in
stage 0.0 failed 1 times, most recent failure: Lost task 4.0 in stage 0.0 (TID
4) (192.168.123.102 executor driver): org.apache.spark.SparkException: Python
worker exited unexpectedly (crashed): Fatal Python error: Segmentation fault
Current thread 0x00007ff85d338700 (most recent call first):
File "/.../miniconda3/envs/python3.9/lib/python3.9/ctypes/__init__.py",
line 525 in string_at
File
"/private/var/folders/0c/q8y15ybd3tn7sr2_jmbmftr80000gp/T/spark-397ac42b-c05b-4f50-a6b8-ede30254edc9/userFiles-fd70c41e-46b9-44ed-b781-f8dea10bcb4a/5ce3da24-912a-4207-af82-5dfc8a845714/malformed_daemon.py",
line 8 in raise_segfault
File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 1450 in
main
...
"/.../miniconda3/envs/python3.9/lib/python3.9/runpy.py", line 197 in
_run_module_as_main
at
org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:550)
at
...
java.base/java.io.DataInputStream.readInt(DataInputStream.java:393)
at
org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:92)
... 30 more
Driver stacktrace:
JVM stacktrace:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 4
in stage 0.0 failed 1 times, most recent failure: Lost task 4.0 in stage 0.0
(TID 4) (192.168.123.102 executor driver): org.apache.spark.SparkException:
Python worker exited unexpectedly (crashed): Fatal Python error: Segmentation
fault
Current thread 0x00007ff85d338700 (most recent call first):
File "/.../miniconda3/envs/python3.9/lib/python3.9/ctypes/__init__.py",
line 525 in string_at
...
```
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #43600 from HyukjinKwon/more-segfault.
Authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
index 840352eaf4a..d265bb2fd8b 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
@@ -542,12 +542,12 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
logError("This may have been caused by a prior exception:",
writer.exception.get)
throw writer.exception.get
- case eof: EOFException if faultHandlerEnabled && pid.isDefined &&
+ case e: IOException if faultHandlerEnabled && pid.isDefined &&
JavaFiles.exists(BasePythonRunner.faultHandlerLogPath(pid.get)) =>
val path = BasePythonRunner.faultHandlerLogPath(pid.get)
val error = String.join("\n", JavaFiles.readAllLines(path)) + "\n"
JavaFiles.deleteIfExists(path)
- throw new SparkException(s"Python worker exited unexpectedly
(crashed): $error", eof)
+ throw new SparkException(s"Python worker exited unexpectedly
(crashed): $error", e)
case eof: EOFException =>
throw new SparkException("Python worker exited unexpectedly
(crashed)", eof)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]