This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new 0cc21bab2ec [SPARK-43340][CORE] Handle missing stack-trace field in 
eventlogs
0cc21bab2ec is described below

commit 0cc21bab2ec055ab8fb7badc7dbdb7ecd7d80a26
Author: Ahmed Hussein <ahuss...@apache.org>
AuthorDate: Fri May 5 16:00:30 2023 -0700

    [SPARK-43340][CORE] Handle missing stack-trace field in eventlogs
    
    This PR fixes a regression introduced by #36885 which broke JsonProtocol's 
ability to handle missing fields from exception field. old eventlogs missing a 
`Stack Trace` will raise a NPE.
    As a result, SHS misinterprets  failed-jobs/SQLs as `Active/Incomplete`
    
    This PR solves this problem by checking the JsonNode for null. If it is 
null, an empty array of `StackTraceElements`
    
    Fix a case which prevents the history server from identifying failed jobs 
if the stacktrace was not set.
    
    Example eventlog
    
    ```
    {
       "Event":"SparkListenerJobEnd",
       "Job ID":31,
       "Completion Time":1616171909785,
       "Job Result":{
          "Result":"JobFailed",
          "Exception":{
             "Message":"Job aborted"
          }
       }
    }
    ```
    
    **Original behavior:**
    
    The job is marked as `incomplete`
    
    Error from the SHS logs:
    
    ```
    23/05/01 21:57:16 INFO FsHistoryProvider: Parsing 
file:/tmp/nds_q86_fail_test to re-build UI...
    23/05/01 21:57:17 ERROR ReplayListenerBus: Exception parsing Spark event 
log: file:/tmp/nds_q86_fail_test
    java.lang.NullPointerException
        at 
org.apache.spark.util.JsonProtocol$JsonNodeImplicits.extractElements(JsonProtocol.scala:1589)
        at 
org.apache.spark.util.JsonProtocol$.stackTraceFromJson(JsonProtocol.scala:1558)
        at 
org.apache.spark.util.JsonProtocol$.exceptionFromJson(JsonProtocol.scala:1569)
        at 
org.apache.spark.util.JsonProtocol$.jobResultFromJson(JsonProtocol.scala:1423)
        at 
org.apache.spark.util.JsonProtocol$.jobEndFromJson(JsonProtocol.scala:967)
        at 
org.apache.spark.util.JsonProtocol$.sparkEventFromJson(JsonProtocol.scala:878)
        at 
org.apache.spark.util.JsonProtocol$.sparkEventFromJson(JsonProtocol.scala:865)
    ....
    23/05/01 21:57:17 ERROR ReplayListenerBus: Malformed line #24368: 
{"Event":"SparkListenerJobEnd","Job ID":31,"Completion Time":1616171909785,"Job 
Result":{"Result":"JobFailed","Exception":
    {"Message":"Job aborted"}
    }}
    ```
    
    **After the fix:**
    
    Job 31 is marked as `failedJob`
    
    No.
    
    Added new unit test in JsonProtocolSuite.
    
    Closes #41050 from amahussein/aspark-43340-b.
    
    Authored-by: Ahmed Hussein <ahuss...@apache.org>
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
    (cherry picked from commit dcd710d3e12f6cc540cea2b8c747bb6b61254504)
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
---
 .../scala/org/apache/spark/util/JsonProtocol.scala |  4 +-
 .../org/apache/spark/util/JsonProtocolSuite.scala  | 54 ++++++++++++++++++++++
 2 files changed, 56 insertions(+), 2 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala 
b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 6b75971fc25..c1d52484049 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -1555,13 +1555,13 @@ private[spark] object JsonProtocol {
   }
 
   def stackTraceFromJson(json: JsonNode): Array[StackTraceElement] = {
-    json.extractElements.map { line =>
+    jsonOption(json).map(_.extractElements.map { line =>
       val declaringClass = line.get("Declaring Class").extractString
       val methodName = line.get("Method Name").extractString
       val fileName = line.get("File Name").extractString
       val lineNumber = line.get("Line Number").extractInt
       new StackTraceElement(declaringClass, methodName, fileName, lineNumber)
-    }.toArray
+    }.toArray).getOrElse(Array[StackTraceElement]())
   }
 
   def exceptionFromJson(json: JsonNode): Exception = {
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala 
b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index ea71a4b3f1b..44516c1d567 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -810,6 +810,60 @@ class JsonProtocolSuite extends SparkFunSuite {
       
JsonProtocol.taskEndReasonFromJson(exceptionFailureJson).asInstanceOf[ExceptionFailure]
     assert(exceptionFailure.description == null)
   }
+
+  test("SPARK-43340: Handle missing Stack Trace in event log") {
+    val exNoStackJson =
+      """
+        |{
+        |  "Message": "Job aborted"
+        |}
+        |""".stripMargin
+    val exNoStack = JsonProtocol.exceptionFromJson(exNoStackJson)
+    assert(exNoStack.getStackTrace.isEmpty)
+
+    val exEmptyStackJson =
+      """
+        |{
+        |  "Message": "Job aborted",
+        |  "Stack Trace": []
+        |}
+        |""".stripMargin
+    val exEmptyStack = JsonProtocol.exceptionFromJson(exEmptyStackJson)
+    assert(exEmptyStack.getStackTrace.isEmpty)
+
+    // test entire job failure event is equivalent
+    val exJobFailureNoStackJson =
+      """
+        |{
+        |   "Event": "SparkListenerJobEnd",
+        |   "Job ID": 31,
+        |   "Completion Time": 1616171909785,
+        |   "Job Result":{
+        |      "Result": "JobFailed",
+        |      "Exception": {
+        |         "Message": "Job aborted"
+        |      }
+        |   }
+        |}
+        |""".stripMargin
+    val exJobFailureExpectedJson =
+      """
+        |{
+        |   "Event": "SparkListenerJobEnd",
+        |   "Job ID": 31,
+        |   "Completion Time": 1616171909785,
+        |   "Job Result": {
+        |      "Result": "JobFailed",
+        |      "Exception": {
+        |         "Message": "Job aborted",
+        |         "Stack Trace": []
+        |      }
+        |   }
+        |}
+        |""".stripMargin
+    val jobFailedEvent = 
JsonProtocol.sparkEventFromJson(exJobFailureNoStackJson)
+    testEvent(jobFailedEvent, exJobFailureExpectedJson)
+  }
 }
 
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to