This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new cefc27f03 [KYUUBI #6290] Add custom exception serialization for 
SparkOperationEvent
cefc27f03 is described below

commit cefc27f035c9096c63dfbca080684381f47669e2
Author: wforget <[email protected]>
AuthorDate: Thu Apr 11 19:29:41 2024 +0800

    [KYUUBI #6290] Add custom exception serialization for SparkOperationEvent
    
    # :mag: Description
    ## Issue References ๐Ÿ”—
    
    This pull request fixes #6290
    
    ## Describe Your Solution ๐Ÿ”ง
    
    Serializing the `SparkException` object may cause NPE, so I referred to 
SparkListenerEvent serialization methods to implement `ExceptionDeserializer` 
and `ExceptionSerializer`.
    
    error detail:
    
    ```
    (was java.lang.NullPointerException) (through reference chain: 
org.apache.kyuubi.engine.spark.events.SparkOperationEvent["exception"]->org.apache.spark.SparkException["internalError"])
    com.fasterxml.jackson.databind.JsonMappingException: (was 
java.lang.NullPointerException) (through reference chain: 
org.apache.kyuubi.engine.spark.events.SparkOperationEvent["exception"]->org.apache.spark.SparkException["internalError"])
            at 
com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:402)
            at 
com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:361)
            at 
com.fasterxml.jackson.databind.ser.std.StdSerializer.wrapAndThrow(StdSerializer.java:323)
            at 
com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:780)
            at 
com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:178)
            at 
com.fasterxml.jackson.databind.ser.std.ReferenceTypeSerializer.serialize(ReferenceTypeSerializer.java:386)
            at 
com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:732)
            at 
com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:772)
            at 
com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeWithType(BeanSerializerBase.java:655)
            at 
com.fasterxml.jackson.databind.ser.impl.TypeWrappedSerializer.serialize(TypeWrappedSerializer.java:32)
            at 
com.fasterxml.jackson.databind.ser.DefaultSerializerProvider._serialize(DefaultSerializerProvider.java:479)
            at 
com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:318)
            at 
com.fasterxml.jackson.databind.ObjectMapper.writeValue(ObjectMapper.java:3303)
            at 
org.apache.spark.util.JsonProtocol$.writeSparkEventToJson(JsonProtocol.scala:110)
            at 
org.apache.spark.util.JsonProtocol$.$anonfun$sparkEventToJsonString$1(JsonProtocol.scala:63)
            at 
org.apache.spark.util.JsonProtocol$.$anonfun$sparkEventToJsonString$1$adapted(JsonProtocol.scala:62)
            at org.apache.spark.util.JsonUtils.toJsonString(JsonUtils.scala:36)
            at org.apache.spark.util.JsonUtils.toJsonString$(JsonUtils.scala:33)
            at 
org.apache.spark.util.JsonProtocol$.toJsonString(JsonProtocol.scala:54)
            at 
org.apache.spark.util.JsonProtocol$.sparkEventToJsonString(JsonProtocol.scala:62)
            at 
org.apache.spark.kyuubi.KyuubiSparkEventSuite.$anonfun$new$1(KyuubiSparkEventSuite.scala:44)
            at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
            at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
            at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
            at org.scalatest.Transformer.apply(Transformer.scala:22)
            at org.scalatest.Transformer.apply(Transformer.scala:20)
            at 
org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226)
            at 
org.apache.kyuubi.KyuubiFunSuite.withFixture(KyuubiFunSuite.scala:65)
            at 
org.apache.kyuubi.KyuubiFunSuite.withFixture$(KyuubiFunSuite.scala:59)
            at 
org.apache.spark.kyuubi.KyuubiSparkEventSuite.withFixture(KyuubiSparkEventSuite.scala:25)
            at 
org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224)
            at 
org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236)
            at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
            at 
org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236)
            at 
org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218)
            at 
org.apache.spark.kyuubi.KyuubiSparkEventSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(KyuubiSparkEventSuite.scala:25)
            at 
org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234)
            at 
org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227)
            at 
org.apache.spark.kyuubi.KyuubiSparkEventSuite.runTest(KyuubiSparkEventSuite.scala:25)
            at 
org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269)
            at 
org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
            at scala.collection.immutable.List.foreach(List.scala:431)
            at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
            at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
            at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
            at 
org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:269)
            at 
org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:268)
            at 
org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1564)
            at org.scalatest.Suite.run(Suite.scala:1114)
            at org.scalatest.Suite.run$(Suite.scala:1096)
            at 
org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1564)
            at 
org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:273)
            at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
            at 
org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:273)
            at 
org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:272)
            at 
org.apache.spark.kyuubi.KyuubiSparkEventSuite.org$scalatest$BeforeAndAfterAll$$super$run(KyuubiSparkEventSuite.scala:25)
            at 
org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
            at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
            at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
            at 
org.apache.spark.kyuubi.KyuubiSparkEventSuite.run(KyuubiSparkEventSuite.scala:25)
            at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:47)
            at 
org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13(Runner.scala:1321)
            at 
org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13$adapted(Runner.scala:1315)
            at scala.collection.immutable.List.foreach(List.scala:431)
            at 
org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1315)
            at 
org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24(Runner.scala:992)
            at 
org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24$adapted(Runner.scala:970)
            at 
org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1481)
            at 
org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:970)
            at org.scalatest.tools.Runner$.run(Runner.scala:798)
            at org.scalatest.tools.Runner.run(Runner.scala)
            at 
org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2or3(ScalaTestRunner.java:43)
            at 
org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:26)
    Caused by: java.lang.NullPointerException
            at 
org.apache.spark.SparkThrowableHelper$.isInternalError(SparkThrowableHelper.scala:64)
            at 
org.apache.spark.SparkThrowableHelper.isInternalError(SparkThrowableHelper.scala)
            at 
org.apache.spark.SparkThrowable.isInternalError(SparkThrowable.java:50)
            at 
org.apache.spark.SparkException.isInternalError(SparkException.scala:27)
            at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
            at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
            at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
            at java.lang.reflect.Method.invoke(Method.java:498)
            at 
com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:688)
            at 
com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:772)
            ... 69 more
    
    ```
    
    ## Types of changes :bookmark:
    
    - [X] Bugfix (non-breaking change which fixes an issue)
    - [ ] New feature (non-breaking change which adds functionality)
    - [ ] Breaking change (fix or feature that would cause existing 
functionality to change)
    
    ## Test Plan ๐Ÿงช
    
    #### Behavior Without This Pull Request :coffin:
    
    #### Behavior With This Pull Request :tada:
    
    #### Related Unit Tests
    
    new unit test
    
    ---
    
    # Checklist ๐Ÿ“
    
    - [X] This patch was not authored or co-authored using [Generative 
Tooling](https://www.apache.org/legal/generative-tooling.html)
    
    **Be nice. Be informative.**
    
    Closes #6289 from wForget/hotfix2.
    
    Closes #6290
    
    cf701f97e [wforget] fix test
    41df6ce9b [wforget] Add Exception Serialization for SparkOperationEvent
    
    Authored-by: wforget <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
---
 .../engine/spark/events/SparkOperationEvent.scala  |  5 +-
 .../spark/kyuubi/KyuubiSparkEventSuite.scala       | 56 ++++++++++++++
 .../org/apache/kyuubi/events/JsonProtocol.scala    | 87 +++++++++++++++++++++-
 3 files changed, 146 insertions(+), 2 deletions(-)

diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SparkOperationEvent.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SparkOperationEvent.scala
index 143ba61f8..caf49fb05 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SparkOperationEvent.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SparkOperationEvent.scala
@@ -18,13 +18,14 @@
 package org.apache.kyuubi.engine.spark.events
 
 import com.fasterxml.jackson.annotation.JsonIgnore
+import com.fasterxml.jackson.databind.annotation.{JsonDeserialize, 
JsonSerialize}
 import org.apache.spark.scheduler.SparkListenerEvent
 import org.apache.spark.util.kvstore.KVIndex
 
 import org.apache.kyuubi.Utils
 import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.KVIndexParam
 import org.apache.kyuubi.engine.spark.operation.SparkOperation
-import org.apache.kyuubi.events.KyuubiEvent
+import org.apache.kyuubi.events.{ExceptionDeserializer, ExceptionSerializer, 
KyuubiEvent}
 
 /**
  * A [[SparkOperationEvent]] used to tracker the lifecycle of an operation at 
Spark SQL Engine side.
@@ -60,6 +61,8 @@ case class SparkOperationEvent(
     createTime: Long,
     startTime: Long,
     completeTime: Long,
+    @JsonSerialize(contentUsing = classOf[ExceptionSerializer])
+    @JsonDeserialize(contentUsing = classOf[ExceptionDeserializer])
     exception: Option[Throwable],
     sessionId: String,
     sessionUser: String,
diff --git 
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/KyuubiSparkEventSuite.scala
 
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/KyuubiSparkEventSuite.scala
new file mode 100644
index 000000000..2a0a05d9c
--- /dev/null
+++ 
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/KyuubiSparkEventSuite.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.kyuubi
+
+import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+import org.apache.spark.SparkException
+
+import org.apache.kyuubi.KyuubiFunSuite
+import org.apache.kyuubi.engine.spark.events.SparkOperationEvent
+
+class KyuubiSparkEventSuite extends KyuubiFunSuite {
+
+  test("test exception serializer and deserializer of SparkOperationEvent") {
+    val exception = new SparkException("message", new Exception("cause"))
+    val event = new SparkOperationEvent(
+      "statementId",
+      "statement",
+      shouldRunAsync = true,
+      "state",
+      0L,
+      0L,
+      0L,
+      0L,
+      Some(exception),
+      "sessionId",
+      "sessionUser",
+      None,
+      None,
+      None)
+    val mapper: ObjectMapper = new 
ObjectMapper().registerModule(DefaultScalaModule)
+      .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
+    val json = mapper.writeValueAsString(event)
+    assert(json.contains("\"exception\":{\"Message\":\"message\",\"Stack 
Trace\":" +
+      "[{\"Declaring 
Class\":\"org.apache.spark.kyuubi.KyuubiSparkEventSuite\","))
+    val deserializeEvent = mapper.readValue(json, classOf[SparkOperationEvent])
+    assert(deserializeEvent.exception.isDefined)
+    assert(deserializeEvent.exception.get.getMessage === "message")
+    assert(deserializeEvent.exception.get.getStackTrace.length > 0)
+  }
+
+}
diff --git 
a/kyuubi-events/src/main/scala/org/apache/kyuubi/events/JsonProtocol.scala 
b/kyuubi-events/src/main/scala/org/apache/kyuubi/events/JsonProtocol.scala
index 32aef4f51..77e76b938 100644
--- a/kyuubi-events/src/main/scala/org/apache/kyuubi/events/JsonProtocol.scala
+++ b/kyuubi-events/src/main/scala/org/apache/kyuubi/events/JsonProtocol.scala
@@ -17,7 +17,10 @@
 
 package org.apache.kyuubi.events
 
-import com.fasterxml.jackson.databind.ObjectMapper
+import scala.collection.JavaConverters._
+
+import com.fasterxml.jackson.core.{JsonGenerator, JsonParser}
+import com.fasterxml.jackson.databind.{DeserializationContext, 
JsonDeserializer, JsonNode, JsonSerializer, ObjectMapper, SerializerProvider}
 import com.fasterxml.jackson.module.scala.DefaultScalaModule
 
 object JsonProtocol {
@@ -30,3 +33,85 @@ object JsonProtocol {
     mapper.readValue(jsonValue, cls)
   }
 }
+
+// Exception serializer and deserializer, copy from 
org.apache.spark.util.JsonProtocol
+class ExceptionSerializer extends JsonSerializer[Exception] {
+
+  override def serialize(
+      value: Exception,
+      gen: JsonGenerator,
+      serializers: SerializerProvider): Unit = {
+    exceptionToJson(value, gen)
+  }
+
+  private def exceptionToJson(exception: Exception, g: JsonGenerator): Unit = {
+    g.writeStartObject()
+    g.writeStringField("Message", exception.getMessage)
+    g.writeFieldName("Stack Trace")
+    stackTraceToJson(exception.getStackTrace, g)
+    g.writeEndObject()
+  }
+
+  private def stackTraceToJson(stackTrace: Array[StackTraceElement], g: 
JsonGenerator): Unit = {
+    g.writeStartArray()
+    stackTrace.foreach { line =>
+      g.writeStartObject()
+      g.writeStringField("Declaring Class", line.getClassName)
+      g.writeStringField("Method Name", line.getMethodName)
+      g.writeStringField("File Name", line.getFileName)
+      g.writeNumberField("Line Number", line.getLineNumber)
+      g.writeEndObject()
+    }
+    g.writeEndArray()
+  }
+}
+
+class ExceptionDeserializer extends JsonDeserializer[Exception] {
+
+  override def deserialize(jsonParser: JsonParser, ctxt: 
DeserializationContext): Exception = {
+    val jsonNode = jsonParser.readValueAsTree[JsonNode]()
+    exceptionFromJson(jsonNode)
+  }
+
+  private def exceptionFromJson(json: JsonNode): Exception = {
+    val message = jsonOption(json.get("Message")).map(_.extractString).orNull
+    val e = new Exception(message)
+    e.setStackTrace(stackTraceFromJson(json.get("Stack Trace")))
+    e
+  }
+
+  private def stackTraceFromJson(json: JsonNode): Array[StackTraceElement] = {
+    jsonOption(json).map(_.extractElements.map { line =>
+      val declaringClass = line.get("Declaring Class").extractString
+      val methodName = line.get("Method Name").extractString
+      val fileName = jsonOption(line.get("File 
Name")).map(_.extractString).orNull
+      val lineNumber = line.get("Line Number").extractInt
+      new StackTraceElement(declaringClass, methodName, fileName, lineNumber)
+    }.toArray).getOrElse(Array[StackTraceElement]())
+  }
+
+  private def jsonOption(json: JsonNode): Option[JsonNode] = {
+    if (json == null || json.isNull) {
+      None
+    } else {
+      Some(json)
+    }
+  }
+
+  implicit private class JsonNodeImplicits(json: JsonNode) {
+    def extractElements: Iterator[JsonNode] = {
+      require(json.isContainerNode, s"Expected container, got 
${json.getNodeType}")
+      json.elements.asScala
+    }
+
+    def extractInt: Int = {
+      require(json.isNumber, s"Expected number, got ${json.getNodeType}")
+      json.intValue
+    }
+
+    def extractString: String = {
+      require(json.isTextual || json.isNull, s"Expected string or NULL, got 
${json.getNodeType}")
+      json.textValue
+    }
+  }
+}

Reply via email to