This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new dcd710d3e12 [SPARK-43340][CORE] Handle missing stack-trace field in 
eventlogs
dcd710d3e12 is described below

commit dcd710d3e12f6cc540cea2b8c747bb6b61254504
Author: Ahmed Hussein <[email protected]>
AuthorDate: Fri May 5 16:00:30 2023 -0700

    [SPARK-43340][CORE] Handle missing stack-trace field in eventlogs
    
    ### What changes were proposed in this pull request?
    
    This PR fixes a regression introduced by #36885 which broke JsonProtocol's 
ability to handle missing fields from exception field. old eventlogs missing a 
`Stack Trace` will raise a NPE.
    As a result, SHS misinterprets  failed-jobs/SQLs as `Active/Incomplete`
    
    This PR solves this problem by checking the JsonNode for null. If it is 
null, an empty array of `StackTraceElements`
    
    ### Why are the changes needed?
    
    Fix a case which prevents the history server from identifying failed jobs 
if the stacktrace was not set.
    
    Example eventlog
    
    ```
    {
       "Event":"SparkListenerJobEnd",
       "Job ID":31,
       "Completion Time":1616171909785,
       "Job Result":{
          "Result":"JobFailed",
          "Exception":{
             "Message":"Job aborted"
          }
       }
    }
    ```
    
    **Original behavior:**
    
    The job is marked as `incomplete`
    
    Error from the SHS logs:
    
    ```
    23/05/01 21:57:16 INFO FsHistoryProvider: Parsing 
file:/tmp/nds_q86_fail_test to re-build UI...
    23/05/01 21:57:17 ERROR ReplayListenerBus: Exception parsing Spark event 
log: file:/tmp/nds_q86_fail_test
    java.lang.NullPointerException
        at 
org.apache.spark.util.JsonProtocol$JsonNodeImplicits.extractElements(JsonProtocol.scala:1589)
        at 
org.apache.spark.util.JsonProtocol$.stackTraceFromJson(JsonProtocol.scala:1558)
        at 
org.apache.spark.util.JsonProtocol$.exceptionFromJson(JsonProtocol.scala:1569)
        at 
org.apache.spark.util.JsonProtocol$.jobResultFromJson(JsonProtocol.scala:1423)
        at 
org.apache.spark.util.JsonProtocol$.jobEndFromJson(JsonProtocol.scala:967)
        at 
org.apache.spark.util.JsonProtocol$.sparkEventFromJson(JsonProtocol.scala:878)
        at 
org.apache.spark.util.JsonProtocol$.sparkEventFromJson(JsonProtocol.scala:865)
    ....
    23/05/01 21:57:17 ERROR ReplayListenerBus: Malformed line #24368: 
{"Event":"SparkListenerJobEnd","Job ID":31,"Completion Time":1616171909785,"Job 
Result":{"Result":"JobFailed","Exception":
    {"Message":"Job aborted"}
    }}
    ```
    
    **After the fix:**
    
    Job 31 is marked as `failedJob`
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Added new unit test in JsonProtocolSuite.
    
    Closes #41050 from amahussein/aspark-43340-b.
    
    Authored-by: Ahmed Hussein <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../scala/org/apache/spark/util/JsonProtocol.scala |  4 +-
 .../org/apache/spark/util/JsonProtocolSuite.scala  | 54 ++++++++++++++++++++++
 2 files changed, 56 insertions(+), 2 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala 
b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index fc680a73ca9..f1d675738e2 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -1555,13 +1555,13 @@ private[spark] object JsonProtocol {
   }
 
   def stackTraceFromJson(json: JsonNode): Array[StackTraceElement] = {
-    json.extractElements.map { line =>
+    jsonOption(json).map(_.extractElements.map { line =>
       val declaringClass = line.get("Declaring Class").extractString
       val methodName = line.get("Method Name").extractString
       val fileName = jsonOption(line.get("File 
Name")).map(_.extractString).orNull
       val lineNumber = line.get("Line Number").extractInt
       new StackTraceElement(declaringClass, methodName, fileName, lineNumber)
-    }.toArray
+    }.toArray).getOrElse(Array[StackTraceElement]())
   }
 
   def exceptionFromJson(json: JsonNode): Exception = {
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala 
b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 0d84e88436c..8105df64705 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -820,6 +820,60 @@ class JsonProtocolSuite extends SparkFunSuite {
     ex.setStackTrace(Array(new StackTraceElement("class", "method", null, -1)))
     testException(ex)
   }
+
+  test("SPARK-43340: Handle missing Stack Trace in event log") {
+    val exNoStackJson =
+      """
+        |{
+        |  "Message": "Job aborted"
+        |}
+        |""".stripMargin
+    val exNoStack = JsonProtocol.exceptionFromJson(exNoStackJson)
+    assert(exNoStack.getStackTrace.isEmpty)
+
+    val exEmptyStackJson =
+      """
+        |{
+        |  "Message": "Job aborted",
+        |  "Stack Trace": []
+        |}
+        |""".stripMargin
+    val exEmptyStack = JsonProtocol.exceptionFromJson(exEmptyStackJson)
+    assert(exEmptyStack.getStackTrace.isEmpty)
+
+    // test entire job failure event is equivalent
+    val exJobFailureNoStackJson =
+      """
+        |{
+        |   "Event": "SparkListenerJobEnd",
+        |   "Job ID": 31,
+        |   "Completion Time": 1616171909785,
+        |   "Job Result":{
+        |      "Result": "JobFailed",
+        |      "Exception": {
+        |         "Message": "Job aborted"
+        |      }
+        |   }
+        |}
+        |""".stripMargin
+    val exJobFailureExpectedJson =
+      """
+        |{
+        |   "Event": "SparkListenerJobEnd",
+        |   "Job ID": 31,
+        |   "Completion Time": 1616171909785,
+        |   "Job Result": {
+        |      "Result": "JobFailed",
+        |      "Exception": {
+        |         "Message": "Job aborted",
+        |         "Stack Trace": []
+        |      }
+        |   }
+        |}
+        |""".stripMargin
+    val jobFailedEvent = 
JsonProtocol.sparkEventFromJson(exJobFailureNoStackJson)
+    testEvent(jobFailedEvent, exJobFailureExpectedJson)
+  }
 }
 
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to