This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new a5a8ec2  [SPARK-31849][PYTHON][SQL] Make PySpark SQL exceptions more 
Pythonic
a5a8ec2 is described below

commit a5a8ec2ca8e75207b4e6c7b76ab6214be4a4237e
Author: HyukjinKwon <gurwls...@apache.org>
AuthorDate: Mon Jun 1 09:45:21 2020 +0900

    [SPARK-31849][PYTHON][SQL] Make PySpark SQL exceptions more Pythonic
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to make PySpark exception more Pythonic by hiding JVM 
stacktrace by default. It can be enabled by turning on 
`spark.sql.pyspark.jvmStacktrace.enabled` configuration.
    
    ```
    Traceback (most recent call last):
      ...
    pyspark.sql.utils.PythonException:
      An exception was thrown from Python worker in the executor. The below is 
the Python worker stacktrace.
    Traceback (most recent call last):
      ...
    ```
    
    If this `spark.sql.pyspark.jvmStacktrace.enabled` is enabled, it appends:
    
    ```
    JVM stacktrace:
    org.apache.spark.Exception: ...
      ...
    ```
    
    For example, the codes below:
    
    ```python
    from pyspark.sql.functions import udf
    udf
    def divide_by_zero(v):
        raise v / 0
    
    spark.range(1).select(divide_by_zero("id")).show()
    ```
    
    will show an error messages that looks like Python exception thrown from 
the local.
    
    <details>
    <summary>Python exception message when 
<code>spark.sql.pyspark.jvmStacktrace.enabled</code> is off (default)</summary>
    
    ```
    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
      File "/.../spark/python/pyspark/sql/dataframe.py", line 427, in show
        print(self._jdf.showString(n, 20, vertical))
      File "/.../spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", 
line 1305, in __call__
      File "/.../spark/python/pyspark/sql/utils.py", line 131, in deco
        raise_from(converted)
      File "<string>", line 3, in raise_from
    pyspark.sql.utils.PythonException:
      An exception was thrown from Python worker in the executor. The below is 
the Python worker stacktrace.
    Traceback (most recent call last):
      File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 605, in 
main
        process()
      File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 597, in 
process
        serializer.dump_stream(out_iter, outfile)
      File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 
223, in dump_stream
        self.serializer.dump_stream(self._batched(iterator), stream)
      File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 
141, in dump_stream
        for obj in iterator:
      File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 
212, in _batched
        for item in iterator:
      File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in 
mapper
        result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in 
udfs)
      File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in 
<genexpr>
        result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in 
udfs)
      File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 90, in 
<lambda>
        return lambda *a: f(*a)
      File "/.../spark/python/lib/pyspark.zip/pyspark/util.py", line 107, in 
wrapper
        return f(*args, **kwargs)
      File "<stdin>", line 3, in divide_by_zero
    ZeroDivisionError: division by zero
    ```
    
    </details>
    
    <details>
    <summary>Python exception message when 
<code>spark.sql.pyspark.jvmStacktrace.enabled</code> is on</summary>
    
    ```
    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
      File "/.../spark/python/pyspark/sql/dataframe.py", line 427, in show
        print(self._jdf.showString(n, 20, vertical))
      File "/.../spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", 
line 1305, in __call__
      File "/.../spark/python/pyspark/sql/utils.py", line 137, in deco
        raise_from(converted)
      File "<string>", line 3, in raise_from
    pyspark.sql.utils.PythonException:
      An exception was thrown from Python worker in the executor. The below is 
the Python worker stacktrace.
    Traceback (most recent call last):
      File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 605, in 
main
        process()
      File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 597, in 
process
        serializer.dump_stream(out_iter, outfile)
      File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 
223, in dump_stream
        self.serializer.dump_stream(self._batched(iterator), stream)
      File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 
141, in dump_stream
        for obj in iterator:
      File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 
212, in _batched
        for item in iterator:
      File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in 
mapper
        result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in 
udfs)
      File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in 
<genexpr>
        result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in 
udfs)
      File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 90, in 
<lambda>
        return lambda *a: f(*a)
      File "/.../spark/python/lib/pyspark.zip/pyspark/util.py", line 107, in 
wrapper
        return f(*args, **kwargs)
      File "<stdin>", line 3, in divide_by_zero
    ZeroDivisionError: division by zero
    
    JVM stacktrace:
    org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 
in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 
(TID 4, 192.168.35.193, executor 0): 
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
      File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 605, in 
main
        process()
      File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 597, in 
process
        serializer.dump_stream(out_iter, outfile)
      File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 
223, in dump_stream
        self.serializer.dump_stream(self._batched(iterator), stream)
      File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 
141, in dump_stream
        for obj in iterator:
      File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 
212, in _batched
        for item in iterator:
      File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in 
mapper
        result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in 
udfs)
      File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in 
<genexpr>
        result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in 
udfs)
      File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 90, in 
<lambda>
        return lambda *a: f(*a)
      File "/.../spark/python/lib/pyspark.zip/pyspark/util.py", line 107, in 
wrapper
        return f(*args, **kwargs)
      File "<stdin>", line 3, in divide_by_zero
    ZeroDivisionError: division by zero
    
        at 
org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:516)
        at 
org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:81)
        at 
org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:64)
        at 
org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:469)
        at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown
 Source)
        at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:753)
        at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:340)
        at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
        at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:127)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:469)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:472)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
    
    Driver stacktrace:
        at 
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2117)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2066)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2065)
        at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2065)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1021)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1021)
        at scala.Option.foreach(Option.scala:407)
        at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1021)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2297)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2246)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2235)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
        at 
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:823)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2108)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2129)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2148)
        at 
org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:467)
        at 
org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:420)
        at 
org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)
        at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3653)
        at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2695)
        at 
org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3644)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
        at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3642)
        at org.apache.spark.sql.Dataset.head(Dataset.scala:2695)
        at org.apache.spark.sql.Dataset.take(Dataset.scala:2902)
        at org.apache.spark.sql.Dataset.getRows(Dataset.scala:300)
        at org.apache.spark.sql.Dataset.showString(Dataset.scala:337)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)
    Caused by: org.apache.spark.api.python.PythonException: Traceback (most 
recent call last):
      File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 605, in 
main
        process()
      File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 597, in 
process
        serializer.dump_stream(out_iter, outfile)
      File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 
223, in dump_stream
        self.serializer.dump_stream(self._batched(iterator), stream)
      File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 
141, in dump_stream
        for obj in iterator:
      File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 
212, in _batched
        for item in iterator:
      File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in 
mapper
        result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in 
udfs)
      File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in 
<genexpr>
        result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in 
udfs)
      File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 90, in 
<lambda>
        return lambda *a: f(*a)
      File "/.../spark/python/lib/pyspark.zip/pyspark/util.py", line 107, in 
wrapper
        return f(*args, **kwargs)
      File "<stdin>", line 3, in divide_by_zero
    ZeroDivisionError: division by zero
    
        at 
org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:516)
        at 
org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:81)
        at 
org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:64)
        at 
org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:469)
        at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown
 Source)
        at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:753)
        at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:340)
        at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
        at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:127)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:469)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:472)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        ... 1 more
    ```
    
    </details>
    
    <details>
    <summary>Python exception message without this change</summary>
    
    ```
    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
      File "/.../spark/python/pyspark/sql/dataframe.py", line 427, in show
        print(self._jdf.showString(n, 20, vertical))
      File "/.../spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", 
line 1305, in __call__
      File "/.../spark/python/pyspark/sql/utils.py", line 98, in deco
        return f(*a, **kw)
      File "/.../spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 
328, in get_return_value
    py4j.protocol.Py4JJavaError: An error occurred while calling 
o160.showString.
    : org.apache.spark.SparkException: Job aborted due to stage failure: Task 
10 in stage 5.0 failed 4 times, most recent failure: Lost task 10.3 in stage 
5.0 (TID 37, 192.168.35.193, executor 3): 
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
      File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 605, in 
main
        process()
      File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 597, in 
process
        serializer.dump_stream(out_iter, outfile)
      File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 
223, in dump_stream
        self.serializer.dump_stream(self._batched(iterator), stream)
      File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 
141, in dump_stream
        for obj in iterator:
      File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 
212, in _batched
        for item in iterator:
      File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in 
mapper
        result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in 
udfs)
      File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in 
<genexpr>
        result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in 
udfs)
      File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 90, in 
<lambda>
        return lambda *a: f(*a)
      File "/.../spark/python/lib/pyspark.zip/pyspark/util.py", line 107, in 
wrapper
        return f(*args, **kwargs)
      File "<stdin>", line 3, in divide_by_zero
    ZeroDivisionError: division by zero
    
        at 
org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:516)
        at 
org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:81)
        at 
org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:64)
        at 
org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:469)
        at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown
 Source)
        at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:753)
        at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:340)
        at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
        at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:127)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:469)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:472)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
    
    Driver stacktrace:
        at 
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2117)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2066)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2065)
        at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2065)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1021)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1021)
        at scala.Option.foreach(Option.scala:407)
        at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1021)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2297)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2246)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2235)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
        at 
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:823)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2108)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2129)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2148)
        at 
org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:467)
        at 
org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:420)
        at 
org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)
        at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3653)
        at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2695)
        at 
org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3644)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
        at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3642)
        at org.apache.spark.sql.Dataset.head(Dataset.scala:2695)
        at org.apache.spark.sql.Dataset.take(Dataset.scala:2902)
        at org.apache.spark.sql.Dataset.getRows(Dataset.scala:300)
        at org.apache.spark.sql.Dataset.showString(Dataset.scala:337)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)
    Caused by: org.apache.spark.api.python.PythonException: Traceback (most 
recent call last):
      File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 605, in 
main
        process()
      File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 597, in 
process
        serializer.dump_stream(out_iter, outfile)
      File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 
223, in dump_stream
        self.serializer.dump_stream(self._batched(iterator), stream)
      File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 
141, in dump_stream
        for obj in iterator:
      File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 
212, in _batched
        for item in iterator:
      File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in 
mapper
        result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in 
udfs)
      File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in 
<genexpr>
        result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in 
udfs)
      File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 90, in 
<lambda>
        return lambda *a: f(*a)
      File "/.../spark/python/lib/pyspark.zip/pyspark/util.py", line 107, in 
wrapper
        return f(*args, **kwargs)
      File "<stdin>", line 3, in divide_by_zero
    ZeroDivisionError: division by zero
    
        at 
org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:516)
        at 
org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:81)
        at 
org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:64)
        at 
org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:469)
        at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown
 Source)
        at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:753)
        at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:340)
        at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
        at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:127)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:469)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:472)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        ... 1 more
    ```
    
    </details>
    
    <br/>
    
    Another example with Python 3.7:
    
    ```python
    sql("a")
    ```
    
    <details>
    <summary>Python exception message when 
<code>spark.sql.pyspark.jvmStacktrace.enabled</code> is off (default)</summary>
    
    ```
    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
      File "/.../spark/python/pyspark/sql/session.py", line 646, in sql
        return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)
      File "/.../spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", 
line 1305, in __call__
      File "/.../spark/python/pyspark/sql/utils.py", line 131, in deco
        raise_from(converted)
      File "<string>", line 3, in raise_from
    pyspark.sql.utils.ParseException:
    mismatched input 'a' expecting {'(', 'ADD', 'ALTER', 'ANALYZE', 'CACHE', 
'CLEAR', 'COMMENT', 'COMMIT', 'CREATE', 'DELETE', 'DESC', 'DESCRIBE', 'DFS', 
'DROP', 'EXPLAIN', 'EXPORT', 'FROM', 'GRANT', 'IMPORT', 'INSERT', 'LIST', 
'LOAD', 'LOCK', 'MAP', 'MERGE', 'MSCK', 'REDUCE', 'REFRESH', 'REPLACE', 
'RESET', 'REVOKE', 'ROLLBACK', 'SELECT', 'SET', 'SHOW', 'START', 'TABLE', 
'TRUNCATE', 'UNCACHE', 'UNLOCK', 'UPDATE', 'USE', 'VALUES', 'WITH'}(line 1, pos 
0)
    
    == SQL ==
    a
    ^^^
    ```
    
    </details>
    
    <details>
    <summary>Python exception message when 
<code>spark.sql.pyspark.jvmStacktrace.enabled</code> is on</summary>
    
    ```
    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
      File "/.../spark/python/pyspark/sql/session.py", line 646, in sql
        return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)
      File "/.../spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", 
line 1305, in __call__
      File "/.../spark/python/pyspark/sql/utils.py", line 131, in deco
        raise_from(converted)
      File "<string>", line 3, in raise_from
    pyspark.sql.utils.ParseException:
    mismatched input 'a' expecting {'(', 'ADD', 'ALTER', 'ANALYZE', 'CACHE', 
'CLEAR', 'COMMENT', 'COMMIT', 'CREATE', 'DELETE', 'DESC', 'DESCRIBE', 'DFS', 
'DROP', 'EXPLAIN', 'EXPORT', 'FROM', 'GRANT', 'IMPORT', 'INSERT', 'LIST', 
'LOAD', 'LOCK', 'MAP', 'MERGE', 'MSCK', 'REDUCE', 'REFRESH', 'REPLACE', 
'RESET', 'REVOKE', 'ROLLBACK', 'SELECT', 'SET', 'SHOW', 'START', 'TABLE', 
'TRUNCATE', 'UNCACHE', 'UNLOCK', 'UPDATE', 'USE', 'VALUES', 'WITH'}(line 1, pos 
0)
    
    == SQL ==
    a
    ^^^
    
    JVM stacktrace:
    org.apache.spark.sql.catalyst.parser.ParseException:
    mismatched input 'a' expecting {'(', 'ADD', 'ALTER', 'ANALYZE', 'CACHE', 
'CLEAR', 'COMMENT', 'COMMIT', 'CREATE', 'DELETE', 'DESC', 'DESCRIBE', 'DFS', 
'DROP', 'EXPLAIN', 'EXPORT', 'FROM', 'GRANT', 'IMPORT', 'INSERT', 'LIST', 
'LOAD', 'LOCK', 'MAP', 'MERGE', 'MSCK', 'REDUCE', 'REFRESH', 'REPLACE', 
'RESET', 'REVOKE', 'ROLLBACK', 'SELECT', 'SET', 'SHOW', 'START', 'TABLE', 
'TRUNCATE', 'UNCACHE', 'UNLOCK', 'UPDATE', 'USE', 'VALUES', 'WITH'}(line 1, pos 
0)
    
    == SQL ==
    a
    ^^^
    
        at 
org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:266)
        at 
org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:133)
        at 
org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:49)
        at 
org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parsePlan(ParseDriver.scala:81)
        at 
org.apache.spark.sql.SparkSession.$anonfun$sql$2(SparkSession.scala:604)
        at 
org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
        at 
org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:604)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:601)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)
    ```
    
    </details>
    
    <details>
    <summary>Python exception message without this change</summary>
    
    ```
    Traceback (most recent call last):
      File "/.../spark/python/pyspark/sql/utils.py", line 98, in deco
        return f(*a, **kw)
      File "/.../spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 
328, in get_return_value
    py4j.protocol.Py4JJavaError: An error occurred while calling o26.sql.
    : org.apache.spark.sql.catalyst.parser.ParseException:
    mismatched input 'a' expecting {'(', 'ADD', 'ALTER', 'ANALYZE', 'CACHE', 
'CLEAR', 'COMMENT', 'COMMIT', 'CREATE', 'DELETE', 'DESC', 'DESCRIBE', 'DFS', 
'DROP', 'EXPLAIN', 'EXPORT', 'FROM', 'GRANT', 'IMPORT', 'INSERT', 'LIST', 
'LOAD', 'LOCK', 'MAP', 'MERGE', 'MSCK', 'REDUCE', 'REFRESH', 'REPLACE', 
'RESET', 'REVOKE', 'ROLLBACK', 'SELECT', 'SET', 'SHOW', 'START', 'TABLE', 
'TRUNCATE', 'UNCACHE', 'UNLOCK', 'UPDATE', 'USE', 'VALUES', 'WITH'}(line 1, pos 
0)
    
    == SQL ==
    a
    ^^^
    
        at 
org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:266)
        at 
org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:133)
        at 
org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:49)
        at 
org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parsePlan(ParseDriver.scala:81)
        at 
org.apache.spark.sql.SparkSession.$anonfun$sql$2(SparkSession.scala:604)
        at 
org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
        at 
org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:604)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:601)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)
    
    During handling of the above exception, another exception occurred:
    
    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
      File "/.../spark/python/pyspark/sql/session.py", line 646, in sql
        return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)
      File "/.../spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", 
line 1305, in __call__
      File "/.../spark/python/pyspark/sql/utils.py", line 102, in deco
        raise converted
    pyspark.sql.utils.ParseException:
    mismatched input 'a' expecting {'(', 'ADD', 'ALTER', 'ANALYZE', 'CACHE', 
'CLEAR', 'COMMENT', 'COMMIT', 'CREATE', 'DELETE', 'DESC', 'DESCRIBE', 'DFS', 
'DROP', 'EXPLAIN', 'EXPORT', 'FROM', 'GRANT', 'IMPORT', 'INSERT', 'LIST', 
'LOAD', 'LOCK', 'MAP', 'MERGE', 'MSCK', 'REDUCE', 'REFRESH', 'REPLACE', 
'RESET', 'REVOKE', 'ROLLBACK', 'SELECT', 'SET', 'SHOW', 'START', 'TABLE', 
'TRUNCATE', 'UNCACHE', 'UNLOCK', 'UPDATE', 'USE', 'VALUES', 'WITH'}(line 1, pos 
0)
    
    == SQL ==
    a
    ^^^
    ```
    
    </details>
    
    ### Why are the changes needed?
    
    Currently, PySpark exceptions are very unfriendly to Python users with 
causing a bunch of JVM stacktrace. See "Python exception message without this 
change" above.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, it will change the exception message. See the examples above.
    
    ### How was this patch tested?
    
    Manually tested by
    
    ```bash
    ./bin/pyspark --conf spark.sql.pyspark.jvmStacktrace.enabled=true
    ```
    
    and running the examples above.
    
    Closes #28661 from HyukjinKwon/python-debug.
    
    Authored-by: HyukjinKwon <gurwls...@apache.org>
    Signed-off-by: HyukjinKwon <gurwls...@apache.org>
    (cherry picked from commit e69466056fb2c121b7bbb6ad082f09deb1c41063)
    Signed-off-by: HyukjinKwon <gurwls...@apache.org>
---
 python/pyspark/sql/tests/test_pandas_udf.py        | 14 +++---
 python/pyspark/sql/utils.py                        | 53 ++++++++++++++++++----
 .../org/apache/spark/sql/internal/SQLConf.scala    | 11 +++++
 3 files changed, 61 insertions(+), 17 deletions(-)

diff --git a/python/pyspark/sql/tests/test_pandas_udf.py 
b/python/pyspark/sql/tests/test_pandas_udf.py
index 4218f5c..7fa65f0 100644
--- a/python/pyspark/sql/tests/test_pandas_udf.py
+++ b/python/pyspark/sql/tests/test_pandas_udf.py
@@ -19,14 +19,12 @@ import unittest
 
 from pyspark.sql.functions import udf, pandas_udf, PandasUDFType
 from pyspark.sql.types import *
-from pyspark.sql.utils import ParseException
+from pyspark.sql.utils import ParseException, PythonException
 from pyspark.rdd import PythonEvalType
 from pyspark.testing.sqlutils import ReusedSQLTestCase, have_pandas, 
have_pyarrow, \
     pandas_requirement_message, pyarrow_requirement_message
 from pyspark.testing.utils import QuietTest
 
-from py4j.protocol import Py4JJavaError
-
 
 @unittest.skipIf(
     not have_pandas or not have_pyarrow,
@@ -157,14 +155,14 @@ class PandasUDFTests(ReusedSQLTestCase):
 
         # plain udf (test for SPARK-23754)
         self.assertRaisesRegexp(
-            Py4JJavaError,
+            PythonException,
             exc_message,
             df.withColumn('v', udf(foo)('id')).collect
         )
 
         # pandas scalar udf
         self.assertRaisesRegexp(
-            Py4JJavaError,
+            PythonException,
             exc_message,
             df.withColumn(
                 'v', pandas_udf(foo, 'double', PandasUDFType.SCALAR)('id')
@@ -173,7 +171,7 @@ class PandasUDFTests(ReusedSQLTestCase):
 
         # pandas grouped map
         self.assertRaisesRegexp(
-            Py4JJavaError,
+            PythonException,
             exc_message,
             df.groupBy('id').apply(
                 pandas_udf(foo, df.schema, PandasUDFType.GROUPED_MAP)
@@ -181,7 +179,7 @@ class PandasUDFTests(ReusedSQLTestCase):
         )
 
         self.assertRaisesRegexp(
-            Py4JJavaError,
+            PythonException,
             exc_message,
             df.groupBy('id').apply(
                 pandas_udf(foofoo, df.schema, PandasUDFType.GROUPED_MAP)
@@ -190,7 +188,7 @@ class PandasUDFTests(ReusedSQLTestCase):
 
         # pandas grouped agg
         self.assertRaisesRegexp(
-            Py4JJavaError,
+            PythonException,
             exc_message,
             df.groupBy('id').agg(
                 pandas_udf(foo, 'double', PandasUDFType.GROUPED_AGG)('id')
diff --git a/python/pyspark/sql/utils.py b/python/pyspark/sql/utils.py
index 147ac33..27adc23 100644
--- a/python/pyspark/sql/utils.py
+++ b/python/pyspark/sql/utils.py
@@ -18,8 +18,19 @@
 import py4j
 import sys
 
+from pyspark import SparkContext
+
 if sys.version_info.major >= 3:
     unicode = str
+    # Disable exception chaining (PEP 3134) in captured exceptions
+    # in order to hide JVM stacktace.
+    exec("""
+def raise_from(e):
+    raise e from None
+""")
+else:
+    def raise_from(e):
+        raise e
 
 
 class CapturedException(Exception):
@@ -29,7 +40,11 @@ class CapturedException(Exception):
         self.cause = convert_exception(cause) if cause is not None else None
 
     def __str__(self):
+        sql_conf = 
SparkContext._jvm.org.apache.spark.sql.internal.SQLConf.get()
+        debug_enabled = sql_conf.pysparkJVMStacktraceEnabled()
         desc = self.desc
+        if debug_enabled:
+            desc = desc + "\nJVM stacktrace:\n%s" % self.stackTrace
         # encode unicode instance for python2 for human readable description
         if sys.version_info.major < 3 and isinstance(desc, unicode):
             return str(desc.encode('utf-8'))
@@ -67,6 +82,12 @@ class QueryExecutionException(CapturedException):
     """
 
 
+class PythonException(CapturedException):
+    """
+    Exceptions thrown from Python workers.
+    """
+
+
 class UnknownException(CapturedException):
     """
     None of the above exceptions.
@@ -75,21 +96,33 @@ class UnknownException(CapturedException):
 
 def convert_exception(e):
     s = e.toString()
-    stackTrace = '\n\t at '.join(map(lambda x: x.toString(), 
e.getStackTrace()))
     c = e.getCause()
+
+    jvm = SparkContext._jvm
+    jwriter = jvm.java.io.StringWriter()
+    e.printStackTrace(jvm.java.io.PrintWriter(jwriter))
+    stacktrace = jwriter.toString()
     if s.startswith('org.apache.spark.sql.AnalysisException: '):
-        return AnalysisException(s.split(': ', 1)[1], stackTrace, c)
+        return AnalysisException(s.split(': ', 1)[1], stacktrace, c)
     if s.startswith('org.apache.spark.sql.catalyst.analysis'):
-        return AnalysisException(s.split(': ', 1)[1], stackTrace, c)
+        return AnalysisException(s.split(': ', 1)[1], stacktrace, c)
     if s.startswith('org.apache.spark.sql.catalyst.parser.ParseException: '):
-        return ParseException(s.split(': ', 1)[1], stackTrace, c)
+        return ParseException(s.split(': ', 1)[1], stacktrace, c)
     if s.startswith('org.apache.spark.sql.streaming.StreamingQueryException: 
'):
-        return StreamingQueryException(s.split(': ', 1)[1], stackTrace, c)
+        return StreamingQueryException(s.split(': ', 1)[1], stacktrace, c)
     if s.startswith('org.apache.spark.sql.execution.QueryExecutionException: 
'):
-        return QueryExecutionException(s.split(': ', 1)[1], stackTrace, c)
+        return QueryExecutionException(s.split(': ', 1)[1], stacktrace, c)
     if s.startswith('java.lang.IllegalArgumentException: '):
-        return IllegalArgumentException(s.split(': ', 1)[1], stackTrace, c)
-    return UnknownException(s, stackTrace, c)
+        return IllegalArgumentException(s.split(': ', 1)[1], stacktrace, c)
+    if c is not None and (
+            
c.toString().startswith('org.apache.spark.api.python.PythonException: ')
+            # To make sure this only catches Python UDFs.
+            and any(map(lambda v: "org.apache.spark.sql.execution.python" in 
v.toString(),
+                        c.getStackTrace()))):
+        msg = ("\n  An exception was thrown from Python worker in the 
executor. "
+               "The below is the Python worker stacktrace.\n%s" % 
c.getMessage())
+        return PythonException(msg, stacktrace)
+    return UnknownException(s, stacktrace, c)
 
 
 def capture_sql_exception(f):
@@ -99,7 +132,9 @@ def capture_sql_exception(f):
         except py4j.protocol.Py4JJavaError as e:
             converted = convert_exception(e.java_exception)
             if not isinstance(converted, UnknownException):
-                raise converted
+                # Hide where the exception came from that shows a non-Pythonic
+                # JVM exception message.
+                raise_from(converted)
             else:
                 raise
     return deco
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index b36c01d..b8b0f32 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1783,6 +1783,15 @@ object SQLConf {
       .version("3.0.0")
       .fallbackConf(ARROW_EXECUTION_ENABLED)
 
+  val PYSPARK_JVM_STACKTRACE_ENABLED =
+    buildConf("spark.sql.pyspark.jvmStacktrace.enabled")
+      .doc("When true, it shows the JVM stacktrace in the user-facing PySpark 
exception " +
+        "together with Python stacktrace. By default, it is disabled and hides 
JVM stacktrace " +
+        "and shows a Python-friendly exception only.")
+      .version("3.0.0")
+      .booleanConf
+      .createWithDefault(false)
+
   val ARROW_SPARKR_EXECUTION_ENABLED =
     buildConf("spark.sql.execution.arrow.sparkr.enabled")
       .doc("When true, make use of Apache Arrow for columnar data transfers in 
SparkR. " +
@@ -3042,6 +3051,8 @@ class SQLConf extends Serializable with Logging {
 
   def arrowPySparkEnabled: Boolean = getConf(ARROW_PYSPARK_EXECUTION_ENABLED)
 
+  def pysparkJVMStacktraceEnabled: Boolean = 
getConf(PYSPARK_JVM_STACKTRACE_ENABLED)
+
   def arrowSparkREnabled: Boolean = getConf(ARROW_SPARKR_EXECUTION_ENABLED)
 
   def arrowPySparkFallbackEnabled: Boolean = 
getConf(ARROW_PYSPARK_FALLBACK_ENABLED)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to