This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 17905ed39891 [SPARK-50909][PYTHON][4.0] Setup faulthandler in
PythonPlannerRunners
17905ed39891 is described below
commit 17905ed398910562faa656ee1842e7a7d8f2a826
Author: Takuya Ueshin <[email protected]>
AuthorDate: Sat Jan 25 16:16:31 2025 +0900
[SPARK-50909][PYTHON][4.0] Setup faulthandler in PythonPlannerRunners
### What changes were proposed in this pull request?
This is a backport of #49592.
Setups `faulthandler` in `PythonPlannerRunner`s.
It can be enabled by the same config as UDFs.
- SQL conf: `spark.sql.execution.pyspark.udf.faulthandler.enabled`
- It fallback to Spark conf: `spark.python.worker.faulthandler.enabled`
- `False` by default
### Why are the changes needed?
The `faulthandler` is not set up in `PythonPlannerRunner`s.
### Does this PR introduce _any_ user-facing change?
When enabled, if Python worker crashes, it may generate thread-dump in the
error message on the best-effort basis of Python process.
### How was this patch tested?
Added the related tests.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #49635 from ueshin/issues/SPARK-50909/4.0/faulthandler.
Lead-authored-by: Takuya Ueshin <[email protected]>
Co-authored-by: Takuya UESHIN <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../org/apache/spark/api/python/PythonRunner.scala | 18 ++--
python/pyspark/sql/tests/test_python_datasource.py | 120 +++++++++++++++++++++
python/pyspark/sql/tests/test_udtf.py | 37 +++++++
python/pyspark/sql/worker/analyze_udtf.py | 12 +++
.../pyspark/sql/worker/commit_data_source_write.py | 12 +++
python/pyspark/sql/worker/create_data_source.py | 12 +++
python/pyspark/sql/worker/lookup_data_sources.py | 12 +++
python/pyspark/sql/worker/plan_data_source_read.py | 12 +++
.../sql/worker/python_streaming_sink_runner.py | 13 +++
.../pyspark/sql/worker/write_into_data_source.py | 12 +++
.../sql/execution/python/PythonPlannerRunner.scala | 29 ++++-
11 files changed, 275 insertions(+), 14 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
index e3d10574419b..28950e5b41d4 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
@@ -33,7 +33,7 @@ import org.apache.spark._
import org.apache.spark.api.python.PythonFunction.PythonAccumulator
import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.internal.LogKeys.TASK_NAME
-import org.apache.spark.internal.config.{BUFFER_SIZE, EXECUTOR_CORES, Python}
+import org.apache.spark.internal.config.{BUFFER_SIZE, EXECUTOR_CORES}
import org.apache.spark.internal.config.Python._
import org.apache.spark.rdd.InputFileBlockHolder
import
org.apache.spark.resource.ResourceProfile.{EXECUTOR_CORES_LOCAL_PROPERTY,
PYSPARK_MEMORY_LOCAL_PROPERTY}
@@ -90,11 +90,11 @@ private[spark] object PythonEvalType {
}
}
-private object BasePythonRunner {
+private[spark] object BasePythonRunner {
- private lazy val faultHandlerLogDir = Utils.createTempDir(namePrefix =
"faulthandler")
+ private[spark] lazy val faultHandlerLogDir = Utils.createTempDir(namePrefix
= "faulthandler")
- private def faultHandlerLogPath(pid: Int): Path = {
+ private[spark] def faultHandlerLogPath(pid: Int): Path = {
new File(faultHandlerLogDir, pid.toString).toPath
}
}
@@ -574,15 +574,15 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
JavaFiles.deleteIfExists(path)
throw new SparkException(s"Python worker exited unexpectedly
(crashed): $error", e)
- case eof: EOFException if !faultHandlerEnabled =>
+ case e: IOException if !faultHandlerEnabled =>
throw new SparkException(
s"Python worker exited unexpectedly (crashed). " +
"Consider setting
'spark.sql.execution.pyspark.udf.faulthandler.enabled' or" +
- s"'${Python.PYTHON_WORKER_FAULTHANLDER_ENABLED.key}' configuration
to 'true' for" +
- "the better Python traceback.", eof)
+ s"'${PYTHON_WORKER_FAULTHANLDER_ENABLED.key}' configuration to
'true' for " +
+ "the better Python traceback.", e)
- case eof: EOFException =>
- throw new SparkException("Python worker exited unexpectedly
(crashed)", eof)
+ case e: IOException =>
+ throw new SparkException("Python worker exited unexpectedly
(crashed)", e)
}
}
diff --git a/python/pyspark/sql/tests/test_python_datasource.py
b/python/pyspark/sql/tests/test_python_datasource.py
index 4d45c1c10a7d..9b132f5d693f 100644
--- a/python/pyspark/sql/tests/test_python_datasource.py
+++ b/python/pyspark/sql/tests/test_python_datasource.py
@@ -508,6 +508,126 @@ class BasePythonDataSourceTestsMixin:
):
df.write.format("test").mode("append").saveAsTable("test_table")
+ def test_data_source_segfault(self):
+ import ctypes
+
+ for enabled, expected in [
+ (True, "Segmentation fault"),
+ (False, "Consider setting .* for the better Python traceback."),
+ ]:
+ with self.subTest(enabled=enabled), self.sql_conf(
+ {"spark.sql.execution.pyspark.udf.faulthandler.enabled":
enabled}
+ ):
+ with
self.subTest(worker="pyspark.sql.worker.create_data_source"):
+
+ class TestDataSource(DataSource):
+ @classmethod
+ def name(cls):
+ return "test"
+
+ def schema(self):
+ return ctypes.string_at(0)
+
+ self.spark.dataSource.register(TestDataSource)
+
+ with self.assertRaisesRegex(Exception, expected):
+ self.spark.read.format("test").load().show()
+
+ with
self.subTest(worker="pyspark.sql.worker.plan_data_source_read"):
+
+ class TestDataSource(DataSource):
+ @classmethod
+ def name(cls):
+ return "test"
+
+ def schema(self):
+ return "x string"
+
+ def reader(self, schema):
+ return TestReader()
+
+ class TestReader(DataSourceReader):
+ def partitions(self):
+ ctypes.string_at(0)
+ return []
+
+ def read(self, partition):
+ return []
+
+ self.spark.dataSource.register(TestDataSource)
+
+ with self.assertRaisesRegex(Exception, expected):
+ self.spark.read.format("test").load().show()
+
+ with self.subTest(worker="pyspark.worker"):
+
+ class TestDataSource(DataSource):
+ @classmethod
+ def name(cls):
+ return "test"
+
+ def schema(self):
+ return "x string"
+
+ def reader(self, schema):
+ return TestReader()
+
+ class TestReader(DataSourceReader):
+ def read(self, partition):
+ ctypes.string_at(0)
+ yield "x",
+
+ self.spark.dataSource.register(TestDataSource)
+
+ with self.assertRaisesRegex(Exception, expected):
+ self.spark.read.format("test").load().show()
+
+ with
self.subTest(worker="pyspark.sql.worker.write_into_data_source"):
+
+ class TestDataSource(DataSource):
+ @classmethod
+ def name(cls):
+ return "test"
+
+ def writer(self, schema, overwrite):
+ return TestWriter()
+
+ class TestWriter(DataSourceWriter):
+ def write(self, iterator):
+ ctypes.string_at(0)
+ return WriterCommitMessage()
+
+ self.spark.dataSource.register(TestDataSource)
+
+ with self.assertRaisesRegex(Exception, expected):
+
self.spark.range(10).write.format("test").mode("append").saveAsTable(
+ "test_table"
+ )
+
+ with
self.subTest(worker="pyspark.sql.worker.commit_data_source_write"):
+
+ class TestDataSource(DataSource):
+ @classmethod
+ def name(cls):
+ return "test"
+
+ def writer(self, schema, overwrite):
+ return TestWriter()
+
+ class TestWriter(DataSourceWriter):
+ def write(self, iterator):
+ return WriterCommitMessage()
+
+ def commit(self, messages):
+ ctypes.string_at(0)
+
+ self.spark.dataSource.register(TestDataSource)
+
+ with self.assertRaisesRegex(Exception, expected):
+
self.spark.range(10).write.format("test").mode("append").saveAsTable(
+ "test_table"
+ )
+
class PythonDataSourceTests(BasePythonDataSourceTestsMixin, ReusedSQLTestCase):
...
diff --git a/python/pyspark/sql/tests/test_udtf.py
b/python/pyspark/sql/tests/test_udtf.py
index eca3ab0013b9..ef029adfb476 100644
--- a/python/pyspark/sql/tests/test_udtf.py
+++ b/python/pyspark/sql/tests/test_udtf.py
@@ -2761,6 +2761,43 @@ class BaseUDTFTestsMixin:
res = self.spark.sql("select i, to_json(v['v1']) from
test_udtf_struct(8)")
assertDataFrameEqual(res, [Row(i=n, s=f'{{"a":"{chr(99 + n)}"}}') for
n in range(8)])
+ def test_udtf_segfault(self):
+ for enabled, expected in [
+ (True, "Segmentation fault"),
+ (False, "Consider setting .* for the better Python traceback."),
+ ]:
+ with self.subTest(enabled=enabled), self.sql_conf(
+ {"spark.sql.execution.pyspark.udf.faulthandler.enabled":
enabled}
+ ):
+ with self.subTest(method="eval"):
+
+ class TestUDTF:
+ def eval(self):
+ import ctypes
+
+ yield ctypes.string_at(0),
+
+ self._check_result_or_exception(
+ TestUDTF, "x: string", expected, err_type=Exception
+ )
+
+ with self.subTest(method="analyze"):
+
+ class TestUDTFWithAnalyze:
+ @staticmethod
+ def analyze():
+ import ctypes
+
+ ctypes.string_at(0)
+ return AnalyzeResult(StructType().add("x",
StringType()))
+
+ def eval(self):
+ yield "x",
+
+ self._check_result_or_exception(
+ TestUDTFWithAnalyze, None, expected, err_type=Exception
+ )
+
class UDTFTests(BaseUDTFTestsMixin, ReusedSQLTestCase):
@classmethod
diff --git a/python/pyspark/sql/worker/analyze_udtf.py
b/python/pyspark/sql/worker/analyze_udtf.py
index 7dafb87c4221..9247fde78004 100644
--- a/python/pyspark/sql/worker/analyze_udtf.py
+++ b/python/pyspark/sql/worker/analyze_udtf.py
@@ -15,6 +15,7 @@
# limitations under the License.
#
+import faulthandler
import inspect
import os
import sys
@@ -106,7 +107,13 @@ def main(infile: IO, outfile: IO) -> None:
in JVM and receive the Python UDTF and its arguments for the `analyze`
static method,
and call the `analyze` static method, and send back a AnalyzeResult as a
result of the method.
"""
+ faulthandler_log_path = os.environ.get("PYTHON_FAULTHANDLER_DIR", None)
try:
+ if faulthandler_log_path:
+ faulthandler_log_path = os.path.join(faulthandler_log_path,
str(os.getpid()))
+ faulthandler_log_file = open(faulthandler_log_path, "w")
+ faulthandler.enable(file=faulthandler_log_file)
+
check_python_version(infile)
memory_limit_mb = int(os.environ.get("PYSPARK_PLANNER_MEMORY_MB",
"-1"))
@@ -247,6 +254,11 @@ def main(infile: IO, outfile: IO) -> None:
except BaseException as e:
handle_worker_exception(e, outfile)
sys.exit(-1)
+ finally:
+ if faulthandler_log_path:
+ faulthandler.disable()
+ faulthandler_log_file.close()
+ os.remove(faulthandler_log_path)
send_accumulator_updates(outfile)
diff --git a/python/pyspark/sql/worker/commit_data_source_write.py
b/python/pyspark/sql/worker/commit_data_source_write.py
index 661e4c8aafd3..c891d9f083cb 100644
--- a/python/pyspark/sql/worker/commit_data_source_write.py
+++ b/python/pyspark/sql/worker/commit_data_source_write.py
@@ -14,6 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
+import faulthandler
import os
import sys
from typing import IO
@@ -47,7 +48,13 @@ def main(infile: IO, outfile: IO) -> None:
responsible for invoking either the `commit` or the `abort` method on a
data source
writer instance, given a list of commit messages.
"""
+ faulthandler_log_path = os.environ.get("PYTHON_FAULTHANDLER_DIR", None)
try:
+ if faulthandler_log_path:
+ faulthandler_log_path = os.path.join(faulthandler_log_path,
str(os.getpid()))
+ faulthandler_log_file = open(faulthandler_log_path, "w")
+ faulthandler.enable(file=faulthandler_log_file)
+
check_python_version(infile)
memory_limit_mb = int(os.environ.get("PYSPARK_PLANNER_MEMORY_MB",
"-1"))
@@ -93,6 +100,11 @@ def main(infile: IO, outfile: IO) -> None:
except BaseException as e:
handle_worker_exception(e, outfile)
sys.exit(-1)
+ finally:
+ if faulthandler_log_path:
+ faulthandler.disable()
+ faulthandler_log_file.close()
+ os.remove(faulthandler_log_path)
send_accumulator_updates(outfile)
diff --git a/python/pyspark/sql/worker/create_data_source.py
b/python/pyspark/sql/worker/create_data_source.py
index f74c1555e6e9..33957616c483 100644
--- a/python/pyspark/sql/worker/create_data_source.py
+++ b/python/pyspark/sql/worker/create_data_source.py
@@ -14,6 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
+import faulthandler
import inspect
import os
import sys
@@ -60,7 +61,13 @@ def main(infile: IO, outfile: IO) -> None:
This process then creates a `DataSource` instance using the above
information and
sends the pickled instance as well as the schema back to the JVM.
"""
+ faulthandler_log_path = os.environ.get("PYTHON_FAULTHANDLER_DIR", None)
try:
+ if faulthandler_log_path:
+ faulthandler_log_path = os.path.join(faulthandler_log_path,
str(os.getpid()))
+ faulthandler_log_file = open(faulthandler_log_path, "w")
+ faulthandler.enable(file=faulthandler_log_file)
+
check_python_version(infile)
memory_limit_mb = int(os.environ.get("PYSPARK_PLANNER_MEMORY_MB",
"-1"))
@@ -158,6 +165,11 @@ def main(infile: IO, outfile: IO) -> None:
except BaseException as e:
handle_worker_exception(e, outfile)
sys.exit(-1)
+ finally:
+ if faulthandler_log_path:
+ faulthandler.disable()
+ faulthandler_log_file.close()
+ os.remove(faulthandler_log_path)
send_accumulator_updates(outfile)
diff --git a/python/pyspark/sql/worker/lookup_data_sources.py
b/python/pyspark/sql/worker/lookup_data_sources.py
index 6da9d5925f63..18737095fa9c 100644
--- a/python/pyspark/sql/worker/lookup_data_sources.py
+++ b/python/pyspark/sql/worker/lookup_data_sources.py
@@ -14,6 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
+import faulthandler
from importlib import import_module
from pkgutil import iter_modules
import os
@@ -50,7 +51,13 @@ def main(infile: IO, outfile: IO) -> None:
This is responsible for searching the available Python Data Sources so
they can be
statically registered automatically.
"""
+ faulthandler_log_path = os.environ.get("PYTHON_FAULTHANDLER_DIR", None)
try:
+ if faulthandler_log_path:
+ faulthandler_log_path = os.path.join(faulthandler_log_path,
str(os.getpid()))
+ faulthandler_log_file = open(faulthandler_log_path, "w")
+ faulthandler.enable(file=faulthandler_log_file)
+
check_python_version(infile)
memory_limit_mb = int(os.environ.get("PYSPARK_PLANNER_MEMORY_MB",
"-1"))
@@ -78,6 +85,11 @@ def main(infile: IO, outfile: IO) -> None:
except BaseException as e:
handle_worker_exception(e, outfile)
sys.exit(-1)
+ finally:
+ if faulthandler_log_path:
+ faulthandler.disable()
+ faulthandler_log_file.close()
+ os.remove(faulthandler_log_path)
send_accumulator_updates(outfile)
diff --git a/python/pyspark/sql/worker/plan_data_source_read.py
b/python/pyspark/sql/worker/plan_data_source_read.py
index 2af25fb52f15..47be2de988a8 100644
--- a/python/pyspark/sql/worker/plan_data_source_read.py
+++ b/python/pyspark/sql/worker/plan_data_source_read.py
@@ -15,6 +15,7 @@
# limitations under the License.
#
+import faulthandler
import os
import sys
import functools
@@ -187,7 +188,13 @@ def main(infile: IO, outfile: IO) -> None:
The partition values and the Arrow Batch are then serialized and sent back
to the JVM
via the socket.
"""
+ faulthandler_log_path = os.environ.get("PYTHON_FAULTHANDLER_DIR", None)
try:
+ if faulthandler_log_path:
+ faulthandler_log_path = os.path.join(faulthandler_log_path,
str(os.getpid()))
+ faulthandler_log_file = open(faulthandler_log_path, "w")
+ faulthandler.enable(file=faulthandler_log_file)
+
check_python_version(infile)
memory_limit_mb = int(os.environ.get("PYSPARK_PLANNER_MEMORY_MB",
"-1"))
@@ -351,6 +358,11 @@ def main(infile: IO, outfile: IO) -> None:
except BaseException as e:
handle_worker_exception(e, outfile)
sys.exit(-1)
+ finally:
+ if faulthandler_log_path:
+ faulthandler.disable()
+ faulthandler_log_file.close()
+ os.remove(faulthandler_log_path)
send_accumulator_updates(outfile)
diff --git a/python/pyspark/sql/worker/python_streaming_sink_runner.py
b/python/pyspark/sql/worker/python_streaming_sink_runner.py
index 0d46fc902121..c1bf5289cbf8 100644
--- a/python/pyspark/sql/worker/python_streaming_sink_runner.py
+++ b/python/pyspark/sql/worker/python_streaming_sink_runner.py
@@ -15,6 +15,7 @@
# limitations under the License.
#
+import faulthandler
import os
import sys
from typing import IO
@@ -55,7 +56,13 @@ def main(infile: IO, outfile: IO) -> None:
responsible for invoking either the `commit` or the `abort` method on a
data source
writer instance, given a list of commit messages.
"""
+ faulthandler_log_path = os.environ.get("PYTHON_FAULTHANDLER_DIR", None)
try:
+ if faulthandler_log_path:
+ faulthandler_log_path = os.path.join(faulthandler_log_path,
str(os.getpid()))
+ faulthandler_log_file = open(faulthandler_log_path, "w")
+ faulthandler.enable(file=faulthandler_log_file)
+
check_python_version(infile)
setup_spark_files(infile)
setup_broadcasts(infile)
@@ -130,6 +137,12 @@ def main(infile: IO, outfile: IO) -> None:
except BaseException as e:
handle_worker_exception(e, outfile)
sys.exit(-1)
+ finally:
+ if faulthandler_log_path:
+ faulthandler.disable()
+ faulthandler_log_file.close()
+ os.remove(faulthandler_log_path)
+
send_accumulator_updates(outfile)
# check end of stream
diff --git a/python/pyspark/sql/worker/write_into_data_source.py
b/python/pyspark/sql/worker/write_into_data_source.py
index 91a1f4d3b1b3..91043ee1c84d 100644
--- a/python/pyspark/sql/worker/write_into_data_source.py
+++ b/python/pyspark/sql/worker/write_into_data_source.py
@@ -14,6 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
+import faulthandler
import inspect
import os
import sys
@@ -74,7 +75,13 @@ def main(infile: IO, outfile: IO) -> None:
instance and send a function using the writer instance that can be used
in mapInPandas/mapInArrow back to the JVM.
"""
+ faulthandler_log_path = os.environ.get("PYTHON_FAULTHANDLER_DIR", None)
try:
+ if faulthandler_log_path:
+ faulthandler_log_path = os.path.join(faulthandler_log_path,
str(os.getpid()))
+ faulthandler_log_file = open(faulthandler_log_path, "w")
+ faulthandler.enable(file=faulthandler_log_file)
+
check_python_version(infile)
memory_limit_mb = int(os.environ.get("PYSPARK_PLANNER_MEMORY_MB",
"-1"))
@@ -229,6 +236,11 @@ def main(infile: IO, outfile: IO) -> None:
except BaseException as e:
handle_worker_exception(e, outfile)
sys.exit(-1)
+ finally:
+ if faulthandler_log_path:
+ faulthandler.disable()
+ faulthandler_log_file.close()
+ os.remove(faulthandler_log_path)
send_accumulator_updates(outfile)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonPlannerRunner.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonPlannerRunner.scala
index 63c98b3002aa..8cc2e1de7a4c 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonPlannerRunner.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonPlannerRunner.scala
@@ -17,9 +17,10 @@
package org.apache.spark.sql.execution.python
-import java.io.{BufferedInputStream, BufferedOutputStream, DataInputStream,
DataOutputStream, EOFException, InputStream}
+import java.io.{BufferedInputStream, BufferedOutputStream, DataInputStream,
DataOutputStream, InputStream, IOException}
import java.nio.ByteBuffer
import java.nio.channels.SelectionKey
+import java.nio.file.{Files => JavaFiles}
import java.util.HashMap
import scala.jdk.CollectionConverters._
@@ -27,7 +28,7 @@ import scala.jdk.CollectionConverters._
import net.razorvine.pickle.Pickler
import org.apache.spark.{JobArtifactSet, SparkEnv, SparkException}
-import org.apache.spark.api.python.{PythonFunction, PythonWorker,
PythonWorkerUtils, SpecialLengths}
+import org.apache.spark.api.python.{BasePythonRunner, PythonFunction,
PythonWorker, PythonWorkerUtils, SpecialLengths}
import org.apache.spark.internal.config.BUFFER_SIZE
import org.apache.spark.internal.config.Python._
import org.apache.spark.sql.internal.SQLConf
@@ -50,6 +51,7 @@ abstract class PythonPlannerRunner[T](func: PythonFunction) {
val authSocketTimeout = env.conf.get(PYTHON_AUTH_SOCKET_TIMEOUT)
val reuseWorker = env.conf.get(PYTHON_WORKER_REUSE)
val localdir = env.blockManager.diskBlockManager.localDirs.map(f =>
f.getPath()).mkString(",")
+ val faultHandlerEnabled: Boolean =
SQLConf.get.pythonUDFWorkerFaulthandlerEnabled
val simplifiedTraceback: Boolean = SQLConf.get.pysparkSimplifiedTraceback
val workerMemoryMb = SQLConf.get.pythonPlannerExecMemory
@@ -74,6 +76,9 @@ abstract class PythonPlannerRunner[T](func: PythonFunction) {
}
envVars.put("SPARK_AUTH_SOCKET_TIMEOUT", authSocketTimeout.toString)
envVars.put("SPARK_BUFFER_SIZE", bufferSize.toString)
+ if (faultHandlerEnabled) {
+ envVars.put("PYTHON_FAULTHANDLER_DIR",
BasePythonRunner.faultHandlerLogDir.toString)
+ }
envVars.put("SPARK_JOB_ARTIFACT_UUID",
jobArtifactUUID.getOrElse("default"))
@@ -81,7 +86,7 @@ abstract class PythonPlannerRunner[T](func: PythonFunction) {
val pickler = new Pickler(/* useMemo = */ true,
/* valueCompare = */ false)
- val (worker: PythonWorker, _) =
+ val (worker: PythonWorker, pid: Option[Int]) =
env.createPythonWorker(pythonExec, workerModule, envVars.asScala.toMap,
useDaemon)
var releasedOrClosed = false
val bufferStream = new DirectByteBufferOutputStream()
@@ -115,8 +120,22 @@ abstract class PythonPlannerRunner[T](func:
PythonFunction) {
res
} catch {
- case eof: EOFException =>
- throw new SparkException("Python worker exited unexpectedly
(crashed)", eof)
+ case e: IOException if faultHandlerEnabled && pid.isDefined &&
+ JavaFiles.exists(BasePythonRunner.faultHandlerLogPath(pid.get)) =>
+ val path = BasePythonRunner.faultHandlerLogPath(pid.get)
+ val error = String.join("\n", JavaFiles.readAllLines(path)) + "\n"
+ JavaFiles.deleteIfExists(path)
+ throw new SparkException(s"Python worker exited unexpectedly
(crashed): $error", e)
+
+ case e: IOException if !faultHandlerEnabled =>
+ throw new SparkException(
+ s"Python worker exited unexpectedly (crashed). " +
+ "Consider setting
'spark.sql.execution.pyspark.udf.faulthandler.enabled' or" +
+ s"'${PYTHON_WORKER_FAULTHANLDER_ENABLED.key}' configuration to
'true' for " +
+ "the better Python traceback.", e)
+
+ case e: IOException =>
+ throw new SparkException("Python worker exited unexpectedly
(crashed)", e)
} finally {
try {
bufferStream.close()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]