This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push: new 0cc21bab2ec [SPARK-43340][CORE] Handle missing stack-trace field in eventlogs 0cc21bab2ec is described below commit 0cc21bab2ec055ab8fb7badc7dbdb7ecd7d80a26 Author: Ahmed Hussein <ahuss...@apache.org> AuthorDate: Fri May 5 16:00:30 2023 -0700 [SPARK-43340][CORE] Handle missing stack-trace field in eventlogs This PR fixes a regression introduced by #36885 which broke JsonProtocol's ability to handle missing fields from exception field. old eventlogs missing a `Stack Trace` will raise a NPE. As a result, SHS misinterprets failed-jobs/SQLs as `Active/Incomplete` This PR solves this problem by checking the JsonNode for null. If it is null, an empty array of `StackTraceElements` Fix a case which prevents the history server from identifying failed jobs if the stacktrace was not set. Example eventlog ``` { "Event":"SparkListenerJobEnd", "Job ID":31, "Completion Time":1616171909785, "Job Result":{ "Result":"JobFailed", "Exception":{ "Message":"Job aborted" } } } ``` **Original behavior:** The job is marked as `incomplete` Error from the SHS logs: ``` 23/05/01 21:57:16 INFO FsHistoryProvider: Parsing file:/tmp/nds_q86_fail_test to re-build UI... 23/05/01 21:57:17 ERROR ReplayListenerBus: Exception parsing Spark event log: file:/tmp/nds_q86_fail_test java.lang.NullPointerException at org.apache.spark.util.JsonProtocol$JsonNodeImplicits.extractElements(JsonProtocol.scala:1589) at org.apache.spark.util.JsonProtocol$.stackTraceFromJson(JsonProtocol.scala:1558) at org.apache.spark.util.JsonProtocol$.exceptionFromJson(JsonProtocol.scala:1569) at org.apache.spark.util.JsonProtocol$.jobResultFromJson(JsonProtocol.scala:1423) at org.apache.spark.util.JsonProtocol$.jobEndFromJson(JsonProtocol.scala:967) at org.apache.spark.util.JsonProtocol$.sparkEventFromJson(JsonProtocol.scala:878) at org.apache.spark.util.JsonProtocol$.sparkEventFromJson(JsonProtocol.scala:865) .... 23/05/01 21:57:17 ERROR ReplayListenerBus: Malformed line #24368: {"Event":"SparkListenerJobEnd","Job ID":31,"Completion Time":1616171909785,"Job Result":{"Result":"JobFailed","Exception": {"Message":"Job aborted"} }} ``` **After the fix:** Job 31 is marked as `failedJob` No. Added new unit test in JsonProtocolSuite. Closes #41050 from amahussein/aspark-43340-b. Authored-by: Ahmed Hussein <ahuss...@apache.org> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> (cherry picked from commit dcd710d3e12f6cc540cea2b8c747bb6b61254504) Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- .../scala/org/apache/spark/util/JsonProtocol.scala | 4 +- .../org/apache/spark/util/JsonProtocolSuite.scala | 54 ++++++++++++++++++++++ 2 files changed, 56 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 6b75971fc25..c1d52484049 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -1555,13 +1555,13 @@ private[spark] object JsonProtocol { } def stackTraceFromJson(json: JsonNode): Array[StackTraceElement] = { - json.extractElements.map { line => + jsonOption(json).map(_.extractElements.map { line => val declaringClass = line.get("Declaring Class").extractString val methodName = line.get("Method Name").extractString val fileName = line.get("File Name").extractString val lineNumber = line.get("Line Number").extractInt new StackTraceElement(declaringClass, methodName, fileName, lineNumber) - }.toArray + }.toArray).getOrElse(Array[StackTraceElement]()) } def exceptionFromJson(json: JsonNode): Exception = { diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index ea71a4b3f1b..44516c1d567 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -810,6 +810,60 @@ class JsonProtocolSuite extends SparkFunSuite { JsonProtocol.taskEndReasonFromJson(exceptionFailureJson).asInstanceOf[ExceptionFailure] assert(exceptionFailure.description == null) } + + test("SPARK-43340: Handle missing Stack Trace in event log") { + val exNoStackJson = + """ + |{ + | "Message": "Job aborted" + |} + |""".stripMargin + val exNoStack = JsonProtocol.exceptionFromJson(exNoStackJson) + assert(exNoStack.getStackTrace.isEmpty) + + val exEmptyStackJson = + """ + |{ + | "Message": "Job aborted", + | "Stack Trace": [] + |} + |""".stripMargin + val exEmptyStack = JsonProtocol.exceptionFromJson(exEmptyStackJson) + assert(exEmptyStack.getStackTrace.isEmpty) + + // test entire job failure event is equivalent + val exJobFailureNoStackJson = + """ + |{ + | "Event": "SparkListenerJobEnd", + | "Job ID": 31, + | "Completion Time": 1616171909785, + | "Job Result":{ + | "Result": "JobFailed", + | "Exception": { + | "Message": "Job aborted" + | } + | } + |} + |""".stripMargin + val exJobFailureExpectedJson = + """ + |{ + | "Event": "SparkListenerJobEnd", + | "Job ID": 31, + | "Completion Time": 1616171909785, + | "Job Result": { + | "Result": "JobFailed", + | "Exception": { + | "Message": "Job aborted", + | "Stack Trace": [] + | } + | } + |} + |""".stripMargin + val jobFailedEvent = JsonProtocol.sparkEventFromJson(exJobFailureNoStackJson) + testEvent(jobFailedEvent, exJobFailureExpectedJson) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org