This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 17905ed39891 [SPARK-50909][PYTHON][4.0] Setup faulthandler in 
PythonPlannerRunners
17905ed39891 is described below

commit 17905ed398910562faa656ee1842e7a7d8f2a826
Author: Takuya Ueshin <[email protected]>
AuthorDate: Sat Jan 25 16:16:31 2025 +0900

    [SPARK-50909][PYTHON][4.0] Setup faulthandler in PythonPlannerRunners
    
    ### What changes were proposed in this pull request?
    
    This is a backport of #49592.
    
    Setups `faulthandler` in `PythonPlannerRunner`s.
    
    It can be enabled by the same config as UDFs.
    
    - SQL conf: `spark.sql.execution.pyspark.udf.faulthandler.enabled`
    - It fallback to Spark conf: `spark.python.worker.faulthandler.enabled`
    - `False` by default
    
    ### Why are the changes needed?
    
    The `faulthandler` is not set up in `PythonPlannerRunner`s.
    
    ### Does this PR introduce _any_ user-facing change?
    
    When enabled, if Python worker crashes, it may generate thread-dump in the 
error message on the best-effort basis of Python process.
    
    ### How was this patch tested?
    
    Added the related tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #49635 from ueshin/issues/SPARK-50909/4.0/faulthandler.
    
    Lead-authored-by: Takuya Ueshin <[email protected]>
    Co-authored-by: Takuya UESHIN <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 .../org/apache/spark/api/python/PythonRunner.scala |  18 ++--
 python/pyspark/sql/tests/test_python_datasource.py | 120 +++++++++++++++++++++
 python/pyspark/sql/tests/test_udtf.py              |  37 +++++++
 python/pyspark/sql/worker/analyze_udtf.py          |  12 +++
 .../pyspark/sql/worker/commit_data_source_write.py |  12 +++
 python/pyspark/sql/worker/create_data_source.py    |  12 +++
 python/pyspark/sql/worker/lookup_data_sources.py   |  12 +++
 python/pyspark/sql/worker/plan_data_source_read.py |  12 +++
 .../sql/worker/python_streaming_sink_runner.py     |  13 +++
 .../pyspark/sql/worker/write_into_data_source.py   |  12 +++
 .../sql/execution/python/PythonPlannerRunner.scala |  29 ++++-
 11 files changed, 275 insertions(+), 14 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
index e3d10574419b..28950e5b41d4 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
@@ -33,7 +33,7 @@ import org.apache.spark._
 import org.apache.spark.api.python.PythonFunction.PythonAccumulator
 import org.apache.spark.internal.{Logging, LogKeys, MDC}
 import org.apache.spark.internal.LogKeys.TASK_NAME
-import org.apache.spark.internal.config.{BUFFER_SIZE, EXECUTOR_CORES, Python}
+import org.apache.spark.internal.config.{BUFFER_SIZE, EXECUTOR_CORES}
 import org.apache.spark.internal.config.Python._
 import org.apache.spark.rdd.InputFileBlockHolder
 import 
org.apache.spark.resource.ResourceProfile.{EXECUTOR_CORES_LOCAL_PROPERTY, 
PYSPARK_MEMORY_LOCAL_PROPERTY}
@@ -90,11 +90,11 @@ private[spark] object PythonEvalType {
   }
 }
 
-private object BasePythonRunner {
+private[spark] object BasePythonRunner {
 
-  private lazy val faultHandlerLogDir = Utils.createTempDir(namePrefix = 
"faulthandler")
+  private[spark] lazy val faultHandlerLogDir = Utils.createTempDir(namePrefix 
= "faulthandler")
 
-  private def faultHandlerLogPath(pid: Int): Path = {
+  private[spark] def faultHandlerLogPath(pid: Int): Path = {
     new File(faultHandlerLogDir, pid.toString).toPath
   }
 }
@@ -574,15 +574,15 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
         JavaFiles.deleteIfExists(path)
         throw new SparkException(s"Python worker exited unexpectedly 
(crashed): $error", e)
 
-      case eof: EOFException if !faultHandlerEnabled =>
+      case e: IOException if !faultHandlerEnabled =>
         throw new SparkException(
           s"Python worker exited unexpectedly (crashed). " +
             "Consider setting 
'spark.sql.execution.pyspark.udf.faulthandler.enabled' or" +
-            s"'${Python.PYTHON_WORKER_FAULTHANLDER_ENABLED.key}' configuration 
to 'true' for" +
-            "the better Python traceback.", eof)
+            s"'${PYTHON_WORKER_FAULTHANLDER_ENABLED.key}' configuration to 
'true' for " +
+            "the better Python traceback.", e)
 
-      case eof: EOFException =>
-        throw new SparkException("Python worker exited unexpectedly 
(crashed)", eof)
+      case e: IOException =>
+        throw new SparkException("Python worker exited unexpectedly 
(crashed)", e)
     }
   }
 
diff --git a/python/pyspark/sql/tests/test_python_datasource.py 
b/python/pyspark/sql/tests/test_python_datasource.py
index 4d45c1c10a7d..9b132f5d693f 100644
--- a/python/pyspark/sql/tests/test_python_datasource.py
+++ b/python/pyspark/sql/tests/test_python_datasource.py
@@ -508,6 +508,126 @@ class BasePythonDataSourceTestsMixin:
         ):
             df.write.format("test").mode("append").saveAsTable("test_table")
 
+    def test_data_source_segfault(self):
+        import ctypes
+
+        for enabled, expected in [
+            (True, "Segmentation fault"),
+            (False, "Consider setting .* for the better Python traceback."),
+        ]:
+            with self.subTest(enabled=enabled), self.sql_conf(
+                {"spark.sql.execution.pyspark.udf.faulthandler.enabled": 
enabled}
+            ):
+                with 
self.subTest(worker="pyspark.sql.worker.create_data_source"):
+
+                    class TestDataSource(DataSource):
+                        @classmethod
+                        def name(cls):
+                            return "test"
+
+                        def schema(self):
+                            return ctypes.string_at(0)
+
+                    self.spark.dataSource.register(TestDataSource)
+
+                    with self.assertRaisesRegex(Exception, expected):
+                        self.spark.read.format("test").load().show()
+
+                with 
self.subTest(worker="pyspark.sql.worker.plan_data_source_read"):
+
+                    class TestDataSource(DataSource):
+                        @classmethod
+                        def name(cls):
+                            return "test"
+
+                        def schema(self):
+                            return "x string"
+
+                        def reader(self, schema):
+                            return TestReader()
+
+                    class TestReader(DataSourceReader):
+                        def partitions(self):
+                            ctypes.string_at(0)
+                            return []
+
+                        def read(self, partition):
+                            return []
+
+                    self.spark.dataSource.register(TestDataSource)
+
+                    with self.assertRaisesRegex(Exception, expected):
+                        self.spark.read.format("test").load().show()
+
+                with self.subTest(worker="pyspark.worker"):
+
+                    class TestDataSource(DataSource):
+                        @classmethod
+                        def name(cls):
+                            return "test"
+
+                        def schema(self):
+                            return "x string"
+
+                        def reader(self, schema):
+                            return TestReader()
+
+                    class TestReader(DataSourceReader):
+                        def read(self, partition):
+                            ctypes.string_at(0)
+                            yield "x",
+
+                    self.spark.dataSource.register(TestDataSource)
+
+                    with self.assertRaisesRegex(Exception, expected):
+                        self.spark.read.format("test").load().show()
+
+                with 
self.subTest(worker="pyspark.sql.worker.write_into_data_source"):
+
+                    class TestDataSource(DataSource):
+                        @classmethod
+                        def name(cls):
+                            return "test"
+
+                        def writer(self, schema, overwrite):
+                            return TestWriter()
+
+                    class TestWriter(DataSourceWriter):
+                        def write(self, iterator):
+                            ctypes.string_at(0)
+                            return WriterCommitMessage()
+
+                    self.spark.dataSource.register(TestDataSource)
+
+                    with self.assertRaisesRegex(Exception, expected):
+                        
self.spark.range(10).write.format("test").mode("append").saveAsTable(
+                            "test_table"
+                        )
+
+                with 
self.subTest(worker="pyspark.sql.worker.commit_data_source_write"):
+
+                    class TestDataSource(DataSource):
+                        @classmethod
+                        def name(cls):
+                            return "test"
+
+                        def writer(self, schema, overwrite):
+                            return TestWriter()
+
+                    class TestWriter(DataSourceWriter):
+                        def write(self, iterator):
+                            return WriterCommitMessage()
+
+                        def commit(self, messages):
+                            ctypes.string_at(0)
+
+                    self.spark.dataSource.register(TestDataSource)
+
+                    with self.assertRaisesRegex(Exception, expected):
+                        
self.spark.range(10).write.format("test").mode("append").saveAsTable(
+                            "test_table"
+                        )
+
 
 class PythonDataSourceTests(BasePythonDataSourceTestsMixin, ReusedSQLTestCase):
     ...
diff --git a/python/pyspark/sql/tests/test_udtf.py 
b/python/pyspark/sql/tests/test_udtf.py
index eca3ab0013b9..ef029adfb476 100644
--- a/python/pyspark/sql/tests/test_udtf.py
+++ b/python/pyspark/sql/tests/test_udtf.py
@@ -2761,6 +2761,43 @@ class BaseUDTFTestsMixin:
         res = self.spark.sql("select i, to_json(v['v1']) from 
test_udtf_struct(8)")
         assertDataFrameEqual(res, [Row(i=n, s=f'{{"a":"{chr(99 + n)}"}}') for 
n in range(8)])
 
+    def test_udtf_segfault(self):
+        for enabled, expected in [
+            (True, "Segmentation fault"),
+            (False, "Consider setting .* for the better Python traceback."),
+        ]:
+            with self.subTest(enabled=enabled), self.sql_conf(
+                {"spark.sql.execution.pyspark.udf.faulthandler.enabled": 
enabled}
+            ):
+                with self.subTest(method="eval"):
+
+                    class TestUDTF:
+                        def eval(self):
+                            import ctypes
+
+                            yield ctypes.string_at(0),
+
+                    self._check_result_or_exception(
+                        TestUDTF, "x: string", expected, err_type=Exception
+                    )
+
+                with self.subTest(method="analyze"):
+
+                    class TestUDTFWithAnalyze:
+                        @staticmethod
+                        def analyze():
+                            import ctypes
+
+                            ctypes.string_at(0)
+                            return AnalyzeResult(StructType().add("x", 
StringType()))
+
+                        def eval(self):
+                            yield "x",
+
+                    self._check_result_or_exception(
+                        TestUDTFWithAnalyze, None, expected, err_type=Exception
+                    )
+
 
 class UDTFTests(BaseUDTFTestsMixin, ReusedSQLTestCase):
     @classmethod
diff --git a/python/pyspark/sql/worker/analyze_udtf.py 
b/python/pyspark/sql/worker/analyze_udtf.py
index 7dafb87c4221..9247fde78004 100644
--- a/python/pyspark/sql/worker/analyze_udtf.py
+++ b/python/pyspark/sql/worker/analyze_udtf.py
@@ -15,6 +15,7 @@
 # limitations under the License.
 #
 
+import faulthandler
 import inspect
 import os
 import sys
@@ -106,7 +107,13 @@ def main(infile: IO, outfile: IO) -> None:
     in JVM and receive the Python UDTF and its arguments for the `analyze` 
static method,
     and call the `analyze` static method, and send back a AnalyzeResult as a 
result of the method.
     """
+    faulthandler_log_path = os.environ.get("PYTHON_FAULTHANDLER_DIR", None)
     try:
+        if faulthandler_log_path:
+            faulthandler_log_path = os.path.join(faulthandler_log_path, 
str(os.getpid()))
+            faulthandler_log_file = open(faulthandler_log_path, "w")
+            faulthandler.enable(file=faulthandler_log_file)
+
         check_python_version(infile)
 
         memory_limit_mb = int(os.environ.get("PYSPARK_PLANNER_MEMORY_MB", 
"-1"))
@@ -247,6 +254,11 @@ def main(infile: IO, outfile: IO) -> None:
     except BaseException as e:
         handle_worker_exception(e, outfile)
         sys.exit(-1)
+    finally:
+        if faulthandler_log_path:
+            faulthandler.disable()
+            faulthandler_log_file.close()
+            os.remove(faulthandler_log_path)
 
     send_accumulator_updates(outfile)
 
diff --git a/python/pyspark/sql/worker/commit_data_source_write.py 
b/python/pyspark/sql/worker/commit_data_source_write.py
index 661e4c8aafd3..c891d9f083cb 100644
--- a/python/pyspark/sql/worker/commit_data_source_write.py
+++ b/python/pyspark/sql/worker/commit_data_source_write.py
@@ -14,6 +14,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
+import faulthandler
 import os
 import sys
 from typing import IO
@@ -47,7 +48,13 @@ def main(infile: IO, outfile: IO) -> None:
     responsible for invoking either the `commit` or the `abort` method on a 
data source
     writer instance, given a list of commit messages.
     """
+    faulthandler_log_path = os.environ.get("PYTHON_FAULTHANDLER_DIR", None)
     try:
+        if faulthandler_log_path:
+            faulthandler_log_path = os.path.join(faulthandler_log_path, 
str(os.getpid()))
+            faulthandler_log_file = open(faulthandler_log_path, "w")
+            faulthandler.enable(file=faulthandler_log_file)
+
         check_python_version(infile)
 
         memory_limit_mb = int(os.environ.get("PYSPARK_PLANNER_MEMORY_MB", 
"-1"))
@@ -93,6 +100,11 @@ def main(infile: IO, outfile: IO) -> None:
     except BaseException as e:
         handle_worker_exception(e, outfile)
         sys.exit(-1)
+    finally:
+        if faulthandler_log_path:
+            faulthandler.disable()
+            faulthandler_log_file.close()
+            os.remove(faulthandler_log_path)
 
     send_accumulator_updates(outfile)
 
diff --git a/python/pyspark/sql/worker/create_data_source.py 
b/python/pyspark/sql/worker/create_data_source.py
index f74c1555e6e9..33957616c483 100644
--- a/python/pyspark/sql/worker/create_data_source.py
+++ b/python/pyspark/sql/worker/create_data_source.py
@@ -14,6 +14,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
+import faulthandler
 import inspect
 import os
 import sys
@@ -60,7 +61,13 @@ def main(infile: IO, outfile: IO) -> None:
     This process then creates a `DataSource` instance using the above 
information and
     sends the pickled instance as well as the schema back to the JVM.
     """
+    faulthandler_log_path = os.environ.get("PYTHON_FAULTHANDLER_DIR", None)
     try:
+        if faulthandler_log_path:
+            faulthandler_log_path = os.path.join(faulthandler_log_path, 
str(os.getpid()))
+            faulthandler_log_file = open(faulthandler_log_path, "w")
+            faulthandler.enable(file=faulthandler_log_file)
+
         check_python_version(infile)
 
         memory_limit_mb = int(os.environ.get("PYSPARK_PLANNER_MEMORY_MB", 
"-1"))
@@ -158,6 +165,11 @@ def main(infile: IO, outfile: IO) -> None:
     except BaseException as e:
         handle_worker_exception(e, outfile)
         sys.exit(-1)
+    finally:
+        if faulthandler_log_path:
+            faulthandler.disable()
+            faulthandler_log_file.close()
+            os.remove(faulthandler_log_path)
 
     send_accumulator_updates(outfile)
 
diff --git a/python/pyspark/sql/worker/lookup_data_sources.py 
b/python/pyspark/sql/worker/lookup_data_sources.py
index 6da9d5925f63..18737095fa9c 100644
--- a/python/pyspark/sql/worker/lookup_data_sources.py
+++ b/python/pyspark/sql/worker/lookup_data_sources.py
@@ -14,6 +14,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
+import faulthandler
 from importlib import import_module
 from pkgutil import iter_modules
 import os
@@ -50,7 +51,13 @@ def main(infile: IO, outfile: IO) -> None:
     This is responsible for searching the available Python Data Sources so 
they can be
     statically registered automatically.
     """
+    faulthandler_log_path = os.environ.get("PYTHON_FAULTHANDLER_DIR", None)
     try:
+        if faulthandler_log_path:
+            faulthandler_log_path = os.path.join(faulthandler_log_path, 
str(os.getpid()))
+            faulthandler_log_file = open(faulthandler_log_path, "w")
+            faulthandler.enable(file=faulthandler_log_file)
+
         check_python_version(infile)
 
         memory_limit_mb = int(os.environ.get("PYSPARK_PLANNER_MEMORY_MB", 
"-1"))
@@ -78,6 +85,11 @@ def main(infile: IO, outfile: IO) -> None:
     except BaseException as e:
         handle_worker_exception(e, outfile)
         sys.exit(-1)
+    finally:
+        if faulthandler_log_path:
+            faulthandler.disable()
+            faulthandler_log_file.close()
+            os.remove(faulthandler_log_path)
 
     send_accumulator_updates(outfile)
 
diff --git a/python/pyspark/sql/worker/plan_data_source_read.py 
b/python/pyspark/sql/worker/plan_data_source_read.py
index 2af25fb52f15..47be2de988a8 100644
--- a/python/pyspark/sql/worker/plan_data_source_read.py
+++ b/python/pyspark/sql/worker/plan_data_source_read.py
@@ -15,6 +15,7 @@
 # limitations under the License.
 #
 
+import faulthandler
 import os
 import sys
 import functools
@@ -187,7 +188,13 @@ def main(infile: IO, outfile: IO) -> None:
     The partition values and the Arrow Batch are then serialized and sent back 
to the JVM
     via the socket.
     """
+    faulthandler_log_path = os.environ.get("PYTHON_FAULTHANDLER_DIR", None)
     try:
+        if faulthandler_log_path:
+            faulthandler_log_path = os.path.join(faulthandler_log_path, 
str(os.getpid()))
+            faulthandler_log_file = open(faulthandler_log_path, "w")
+            faulthandler.enable(file=faulthandler_log_file)
+
         check_python_version(infile)
 
         memory_limit_mb = int(os.environ.get("PYSPARK_PLANNER_MEMORY_MB", 
"-1"))
@@ -351,6 +358,11 @@ def main(infile: IO, outfile: IO) -> None:
     except BaseException as e:
         handle_worker_exception(e, outfile)
         sys.exit(-1)
+    finally:
+        if faulthandler_log_path:
+            faulthandler.disable()
+            faulthandler_log_file.close()
+            os.remove(faulthandler_log_path)
 
     send_accumulator_updates(outfile)
 
diff --git a/python/pyspark/sql/worker/python_streaming_sink_runner.py 
b/python/pyspark/sql/worker/python_streaming_sink_runner.py
index 0d46fc902121..c1bf5289cbf8 100644
--- a/python/pyspark/sql/worker/python_streaming_sink_runner.py
+++ b/python/pyspark/sql/worker/python_streaming_sink_runner.py
@@ -15,6 +15,7 @@
 # limitations under the License.
 #
 
+import faulthandler
 import os
 import sys
 from typing import IO
@@ -55,7 +56,13 @@ def main(infile: IO, outfile: IO) -> None:
     responsible for invoking either the `commit` or the `abort` method on a 
data source
     writer instance, given a list of commit messages.
     """
+    faulthandler_log_path = os.environ.get("PYTHON_FAULTHANDLER_DIR", None)
     try:
+        if faulthandler_log_path:
+            faulthandler_log_path = os.path.join(faulthandler_log_path, 
str(os.getpid()))
+            faulthandler_log_file = open(faulthandler_log_path, "w")
+            faulthandler.enable(file=faulthandler_log_file)
+
         check_python_version(infile)
         setup_spark_files(infile)
         setup_broadcasts(infile)
@@ -130,6 +137,12 @@ def main(infile: IO, outfile: IO) -> None:
     except BaseException as e:
         handle_worker_exception(e, outfile)
         sys.exit(-1)
+    finally:
+        if faulthandler_log_path:
+            faulthandler.disable()
+            faulthandler_log_file.close()
+            os.remove(faulthandler_log_path)
+
     send_accumulator_updates(outfile)
 
     # check end of stream
diff --git a/python/pyspark/sql/worker/write_into_data_source.py 
b/python/pyspark/sql/worker/write_into_data_source.py
index 91a1f4d3b1b3..91043ee1c84d 100644
--- a/python/pyspark/sql/worker/write_into_data_source.py
+++ b/python/pyspark/sql/worker/write_into_data_source.py
@@ -14,6 +14,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
+import faulthandler
 import inspect
 import os
 import sys
@@ -74,7 +75,13 @@ def main(infile: IO, outfile: IO) -> None:
     instance and send a function using the writer instance that can be used
     in mapInPandas/mapInArrow back to the JVM.
     """
+    faulthandler_log_path = os.environ.get("PYTHON_FAULTHANDLER_DIR", None)
     try:
+        if faulthandler_log_path:
+            faulthandler_log_path = os.path.join(faulthandler_log_path, 
str(os.getpid()))
+            faulthandler_log_file = open(faulthandler_log_path, "w")
+            faulthandler.enable(file=faulthandler_log_file)
+
         check_python_version(infile)
 
         memory_limit_mb = int(os.environ.get("PYSPARK_PLANNER_MEMORY_MB", 
"-1"))
@@ -229,6 +236,11 @@ def main(infile: IO, outfile: IO) -> None:
     except BaseException as e:
         handle_worker_exception(e, outfile)
         sys.exit(-1)
+    finally:
+        if faulthandler_log_path:
+            faulthandler.disable()
+            faulthandler_log_file.close()
+            os.remove(faulthandler_log_path)
 
     send_accumulator_updates(outfile)
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonPlannerRunner.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonPlannerRunner.scala
index 63c98b3002aa..8cc2e1de7a4c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonPlannerRunner.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonPlannerRunner.scala
@@ -17,9 +17,10 @@
 
 package org.apache.spark.sql.execution.python
 
-import java.io.{BufferedInputStream, BufferedOutputStream, DataInputStream, 
DataOutputStream, EOFException, InputStream}
+import java.io.{BufferedInputStream, BufferedOutputStream, DataInputStream, 
DataOutputStream, InputStream, IOException}
 import java.nio.ByteBuffer
 import java.nio.channels.SelectionKey
+import java.nio.file.{Files => JavaFiles}
 import java.util.HashMap
 
 import scala.jdk.CollectionConverters._
@@ -27,7 +28,7 @@ import scala.jdk.CollectionConverters._
 import net.razorvine.pickle.Pickler
 
 import org.apache.spark.{JobArtifactSet, SparkEnv, SparkException}
-import org.apache.spark.api.python.{PythonFunction, PythonWorker, 
PythonWorkerUtils, SpecialLengths}
+import org.apache.spark.api.python.{BasePythonRunner, PythonFunction, 
PythonWorker, PythonWorkerUtils, SpecialLengths}
 import org.apache.spark.internal.config.BUFFER_SIZE
 import org.apache.spark.internal.config.Python._
 import org.apache.spark.sql.internal.SQLConf
@@ -50,6 +51,7 @@ abstract class PythonPlannerRunner[T](func: PythonFunction) {
     val authSocketTimeout = env.conf.get(PYTHON_AUTH_SOCKET_TIMEOUT)
     val reuseWorker = env.conf.get(PYTHON_WORKER_REUSE)
     val localdir = env.blockManager.diskBlockManager.localDirs.map(f => 
f.getPath()).mkString(",")
+    val faultHandlerEnabled: Boolean = 
SQLConf.get.pythonUDFWorkerFaulthandlerEnabled
     val simplifiedTraceback: Boolean = SQLConf.get.pysparkSimplifiedTraceback
     val workerMemoryMb = SQLConf.get.pythonPlannerExecMemory
 
@@ -74,6 +76,9 @@ abstract class PythonPlannerRunner[T](func: PythonFunction) {
     }
     envVars.put("SPARK_AUTH_SOCKET_TIMEOUT", authSocketTimeout.toString)
     envVars.put("SPARK_BUFFER_SIZE", bufferSize.toString)
+    if (faultHandlerEnabled) {
+      envVars.put("PYTHON_FAULTHANDLER_DIR", 
BasePythonRunner.faultHandlerLogDir.toString)
+    }
 
     envVars.put("SPARK_JOB_ARTIFACT_UUID", 
jobArtifactUUID.getOrElse("default"))
 
@@ -81,7 +86,7 @@ abstract class PythonPlannerRunner[T](func: PythonFunction) {
     val pickler = new Pickler(/* useMemo = */ true,
       /* valueCompare = */ false)
 
-    val (worker: PythonWorker, _) =
+    val (worker: PythonWorker, pid: Option[Int]) =
       env.createPythonWorker(pythonExec, workerModule, envVars.asScala.toMap, 
useDaemon)
     var releasedOrClosed = false
     val bufferStream = new DirectByteBufferOutputStream()
@@ -115,8 +120,22 @@ abstract class PythonPlannerRunner[T](func: 
PythonFunction) {
 
       res
     } catch {
-      case eof: EOFException =>
-        throw new SparkException("Python worker exited unexpectedly 
(crashed)", eof)
+      case e: IOException if faultHandlerEnabled && pid.isDefined &&
+        JavaFiles.exists(BasePythonRunner.faultHandlerLogPath(pid.get)) =>
+        val path = BasePythonRunner.faultHandlerLogPath(pid.get)
+        val error = String.join("\n", JavaFiles.readAllLines(path)) + "\n"
+        JavaFiles.deleteIfExists(path)
+        throw new SparkException(s"Python worker exited unexpectedly 
(crashed): $error", e)
+
+      case e: IOException if !faultHandlerEnabled =>
+        throw new SparkException(
+          s"Python worker exited unexpectedly (crashed). " +
+            "Consider setting 
'spark.sql.execution.pyspark.udf.faulthandler.enabled' or" +
+            s"'${PYTHON_WORKER_FAULTHANLDER_ENABLED.key}' configuration to 
'true' for " +
+            "the better Python traceback.", e)
+
+      case e: IOException =>
+        throw new SparkException("Python worker exited unexpectedly 
(crashed)", e)
     } finally {
       try {
         bufferStream.close()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to