This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new dcd710d3e12 [SPARK-43340][CORE] Handle missing stack-trace field in
eventlogs
dcd710d3e12 is described below
commit dcd710d3e12f6cc540cea2b8c747bb6b61254504
Author: Ahmed Hussein <[email protected]>
AuthorDate: Fri May 5 16:00:30 2023 -0700
[SPARK-43340][CORE] Handle missing stack-trace field in eventlogs
### What changes were proposed in this pull request?
This PR fixes a regression introduced by #36885 which broke JsonProtocol's
ability to handle missing fields from exception field. old eventlogs missing a
`Stack Trace` will raise a NPE.
As a result, SHS misinterprets failed-jobs/SQLs as `Active/Incomplete`
This PR solves this problem by checking the JsonNode for null. If it is
null, an empty array of `StackTraceElements`
### Why are the changes needed?
Fix a case which prevents the history server from identifying failed jobs
if the stacktrace was not set.
Example eventlog
```
{
"Event":"SparkListenerJobEnd",
"Job ID":31,
"Completion Time":1616171909785,
"Job Result":{
"Result":"JobFailed",
"Exception":{
"Message":"Job aborted"
}
}
}
```
**Original behavior:**
The job is marked as `incomplete`
Error from the SHS logs:
```
23/05/01 21:57:16 INFO FsHistoryProvider: Parsing
file:/tmp/nds_q86_fail_test to re-build UI...
23/05/01 21:57:17 ERROR ReplayListenerBus: Exception parsing Spark event
log: file:/tmp/nds_q86_fail_test
java.lang.NullPointerException
at
org.apache.spark.util.JsonProtocol$JsonNodeImplicits.extractElements(JsonProtocol.scala:1589)
at
org.apache.spark.util.JsonProtocol$.stackTraceFromJson(JsonProtocol.scala:1558)
at
org.apache.spark.util.JsonProtocol$.exceptionFromJson(JsonProtocol.scala:1569)
at
org.apache.spark.util.JsonProtocol$.jobResultFromJson(JsonProtocol.scala:1423)
at
org.apache.spark.util.JsonProtocol$.jobEndFromJson(JsonProtocol.scala:967)
at
org.apache.spark.util.JsonProtocol$.sparkEventFromJson(JsonProtocol.scala:878)
at
org.apache.spark.util.JsonProtocol$.sparkEventFromJson(JsonProtocol.scala:865)
....
23/05/01 21:57:17 ERROR ReplayListenerBus: Malformed line #24368:
{"Event":"SparkListenerJobEnd","Job ID":31,"Completion Time":1616171909785,"Job
Result":{"Result":"JobFailed","Exception":
{"Message":"Job aborted"}
}}
```
**After the fix:**
Job 31 is marked as `failedJob`
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Added new unit test in JsonProtocolSuite.
Closes #41050 from amahussein/aspark-43340-b.
Authored-by: Ahmed Hussein <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../scala/org/apache/spark/util/JsonProtocol.scala | 4 +-
.../org/apache/spark/util/JsonProtocolSuite.scala | 54 ++++++++++++++++++++++
2 files changed, 56 insertions(+), 2 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index fc680a73ca9..f1d675738e2 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -1555,13 +1555,13 @@ private[spark] object JsonProtocol {
}
def stackTraceFromJson(json: JsonNode): Array[StackTraceElement] = {
- json.extractElements.map { line =>
+ jsonOption(json).map(_.extractElements.map { line =>
val declaringClass = line.get("Declaring Class").extractString
val methodName = line.get("Method Name").extractString
val fileName = jsonOption(line.get("File
Name")).map(_.extractString).orNull
val lineNumber = line.get("Line Number").extractInt
new StackTraceElement(declaringClass, methodName, fileName, lineNumber)
- }.toArray
+ }.toArray).getOrElse(Array[StackTraceElement]())
}
def exceptionFromJson(json: JsonNode): Exception = {
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 0d84e88436c..8105df64705 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -820,6 +820,60 @@ class JsonProtocolSuite extends SparkFunSuite {
ex.setStackTrace(Array(new StackTraceElement("class", "method", null, -1)))
testException(ex)
}
+
+ test("SPARK-43340: Handle missing Stack Trace in event log") {
+ val exNoStackJson =
+ """
+ |{
+ | "Message": "Job aborted"
+ |}
+ |""".stripMargin
+ val exNoStack = JsonProtocol.exceptionFromJson(exNoStackJson)
+ assert(exNoStack.getStackTrace.isEmpty)
+
+ val exEmptyStackJson =
+ """
+ |{
+ | "Message": "Job aborted",
+ | "Stack Trace": []
+ |}
+ |""".stripMargin
+ val exEmptyStack = JsonProtocol.exceptionFromJson(exEmptyStackJson)
+ assert(exEmptyStack.getStackTrace.isEmpty)
+
+ // test entire job failure event is equivalent
+ val exJobFailureNoStackJson =
+ """
+ |{
+ | "Event": "SparkListenerJobEnd",
+ | "Job ID": 31,
+ | "Completion Time": 1616171909785,
+ | "Job Result":{
+ | "Result": "JobFailed",
+ | "Exception": {
+ | "Message": "Job aborted"
+ | }
+ | }
+ |}
+ |""".stripMargin
+ val exJobFailureExpectedJson =
+ """
+ |{
+ | "Event": "SparkListenerJobEnd",
+ | "Job ID": 31,
+ | "Completion Time": 1616171909785,
+ | "Job Result": {
+ | "Result": "JobFailed",
+ | "Exception": {
+ | "Message": "Job aborted",
+ | "Stack Trace": []
+ | }
+ | }
+ |}
+ |""".stripMargin
+ val jobFailedEvent =
JsonProtocol.sparkEventFromJson(exJobFailureNoStackJson)
+ testEvent(jobFailedEvent, exJobFailureExpectedJson)
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]