This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new be0b9c1  [KYUUBI #1169] Decouple EngineLoggingService From 
SparkSQLEngine
be0b9c1 is described below

commit be0b9c1e69b4589fff6dedfa2fb68f97476371d7
Author: Kent Yao <[email protected]>
AuthorDate: Tue Sep 28 23:00:16 2021 +0800

    [KYUUBI #1169] Decouple EngineLoggingService From SparkSQLEngine
    
    <!--
    Thanks for sending a pull request!
    
    Here are some tips for you:
      1. If this is your first time, please read our contributor guidelines: 
https://kyuubi.readthedocs.io/en/latest/community/contributions.html
      2. If the PR is related to an issue in 
https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your 
PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
      3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., 
'[WIP][KYUUBI #XXXX] Your PR title ...'.
    -->
    
    ### _Why are the changes needed?_
    <!--
    Please clarify why the changes are needed. For instance,
      1. If you add a feature, you can talk about the use case of it.
      2. If you fix a bug, you can clarify why it is a bug.
    -->
    
    1. Make EngineLoggingService tolerant for creating if any error occurs, and 
then the main code goes.
    
    2. Separate EngineLoggingService From SparkSQLEngine, and we will try to 
start EngineLoggingService before SparkSQLEngine to capture all the lifecycle 
statuses. On the other hand, add a shutdown hook that executed after 
SparkSQLEngine.stop for  EngineLoggingService.stop
    
    3. try-catch for every step of SparkSQLEngine bootstrapping for  creating 
better diagnosis messages.
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run 
test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests)
 locally before make a pull request
    
    Closes #1171 from yaooqinn/1169.
    
    Closes #1169
    
    3a661555 [Kent Yao] ci
    733bd1ed [Kent Yao] address comments
    36ba1661 [Kent Yao] [KYUUBI #1169] Decouple EngineLoggingService From 
SparkSQLEngine
    
    Authored-by: Kent Yao <[email protected]>
    Signed-off-by: Kent Yao <[email protected]>
---
 .../kyuubi/engine/spark/SparkSQLEngine.scala       | 87 ++++++++++++----------
 .../engine/spark/events/EventLoggingService.scala  | 28 +++----
 .../spark/events/EventLoggingServiceSuite.scala    |  2 +-
 3 files changed, 62 insertions(+), 55 deletions(-)

diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
index de160d0..d9941ba 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
@@ -20,12 +20,14 @@ package org.apache.kyuubi.engine.spark
 import java.time.Instant
 import java.util.concurrent.CountDownLatch
 
+import scala.util.control.NonFatal
+
 import org.apache.spark.SparkConf
 import org.apache.spark.kyuubi.SparkSQLEngineListener
 import org.apache.spark.kyuubi.ui.EngineTab
 import org.apache.spark.sql.SparkSession
 
-import org.apache.kyuubi.Logging
+import org.apache.kyuubi.{KyuubiException, Logging}
 import org.apache.kyuubi.Utils._
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.config.KyuubiConf._
@@ -33,23 +35,18 @@ import 
org.apache.kyuubi.engine.spark.SparkSQLEngine.countDownLatch
 import org.apache.kyuubi.engine.spark.events.{EngineEvent, EngineEventsStore, 
EventLoggingService}
 import org.apache.kyuubi.ha.HighAvailabilityConf._
 import org.apache.kyuubi.ha.client.RetryPolicies
-import org.apache.kyuubi.service.{Serverable, ServiceState}
+import org.apache.kyuubi.service.Serverable
 import org.apache.kyuubi.util.SignalRegister
 
 case class SparkSQLEngine(spark: SparkSession) extends 
Serverable("SparkSQLEngine") {
 
-  lazy val engineStatus: EngineEvent = EngineEvent(this)
-
-  private val eventLogging = new EventLoggingService(this)
   override val backendService = new SparkSQLBackendService(spark)
   override val frontendServices = Seq(new 
SparkThriftBinaryFrontendService(this))
 
   override def initialize(conf: KyuubiConf): Unit = {
     val listener = new SparkSQLEngineListener(this, new 
EngineEventsStore(conf))
     spark.sparkContext.addSparkListener(listener)
-    addService(eventLogging)
     super.initialize(conf)
-    eventLogging.onEvent(engineStatus.copy(state = 
ServiceState.INITIALIZED.id))
   }
 
   override def start(): Unit = {
@@ -57,24 +54,11 @@ case class SparkSQLEngine(spark: SparkSession) extends 
Serverable("SparkSQLEngin
     // Start engine self-terminating checker after all services are ready and 
it can be reached by
     // all servers in engine spaces.
     backendService.sessionManager.startTerminatingChecker()
-    eventLogging.onEvent(engineStatus.copy(state = ServiceState.STARTED.id))
-  }
-
-  override def stop(): Unit = {
-    if (!state.equals(ServiceState.LATENT)) {
-      eventLogging.onEvent(
-        engineStatus.copy(state = ServiceState.STOPPED.id, endTime = 
System.currentTimeMillis()))
-    }
-    super.stop()
   }
 
   override protected def stopServer(): Unit = {
     countDownLatch.countDown()
   }
-
-  def engineId: String = {
-    
spark.sparkContext.applicationAttemptId.getOrElse(spark.sparkContext.applicationId)
-  }
 }
 
 object SparkSQLEngine extends Logging {
@@ -127,12 +111,37 @@ object SparkSQLEngine extends Logging {
   def startEngine(spark: SparkSession): Unit = {
     currentEngine = Some(new SparkSQLEngine(spark))
     currentEngine.foreach { engine =>
-      engine.initialize(kyuubiConf)
-      engine.start()
+      // start event logging ahead so that we can capture all statuses
+      val eventLogging = new EventLoggingService(spark.sparkContext)
+      try {
+        eventLogging.initialize(kyuubiConf)
+        eventLogging.start()
+      } catch {
+        case NonFatal(e) =>
+          // Don't block the main process if the `EventLoggingService` failed 
to start
+          warn(s"Failed to initialize EventLoggingService: ${e.getMessage}", e)
+      }
+
+      try {
+        engine.initialize(kyuubiConf)
+        EventLoggingService.onEvent(EngineEvent(engine))
+      } catch {
+        case t: Throwable =>
+          throw new KyuubiException(s"Failed to initialize SparkSQLEngine: 
${t.getMessage}", t)
+      }
+      try {
+        engine.start()
+        EngineTab(engine)
+        val event = EngineEvent(engine)
+        info(event)
+        EventLoggingService.onEvent(event)
+      } catch {
+        case t: Throwable =>
+          throw new KyuubiException(s"Failed to start SparkSQLEngine: 
${t.getMessage}", t)
+      }
       // Stop engine before SparkContext stopped to avoid calling a stopped 
SparkContext
-      addShutdownHook(() => engine.stop(), SPARK_CONTEXT_SHUTDOWN_PRIORITY + 1)
-      EngineTab(engine)
-      info(engine.engineStatus)
+      addShutdownHook(() => engine.stop(), SPARK_CONTEXT_SHUTDOWN_PRIORITY + 2)
+      addShutdownHook(() => eventLogging.stop(), 
SPARK_CONTEXT_SHUTDOWN_PRIORITY + 1)
     }
   }
 
@@ -141,22 +150,20 @@ object SparkSQLEngine extends Logging {
     var spark: SparkSession = null
     try {
       spark = createSpark()
-      startEngine(spark)
-      // blocking main thread
-      countDownLatch.await()
-    } catch {
-      case t: Throwable if currentEngine.isDefined =>
-        currentEngine.foreach { engine =>
-          val status =
-            engine.engineStatus.copy(diagnostic = s"Error State SparkSQL 
Engine ${t.getMessage}")
-          if (!engine.getServiceState.equals(ServiceState.LATENT)) {
-            EventLoggingService.onEvent(status)
-          }
-          error(status, t)
+      try {
+        startEngine(spark)
+        // blocking main thread
+        countDownLatch.await()
+      } catch {
+        case e: KyuubiException if currentEngine.isDefined =>
+          val engine = currentEngine.get
           engine.stop()
-        }
-      case t: Throwable =>
-        error("Create SparkSQL Engine Failed", t)
+          val event = EngineEvent(engine).copy(diagnostic = e.getMessage)
+          EventLoggingService.onEvent(event)
+          error(event, e)
+      }
+    } catch {
+      case t: Throwable => error(s"Failed to instantiate SparkSession: 
${t.getMessage}", t)
     } finally {
       if (spark != null) {
         spark.stop()
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EventLoggingService.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EventLoggingService.scala
index 568ade2..7539cce 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EventLoggingService.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EventLoggingService.scala
@@ -17,29 +17,28 @@
 
 package org.apache.kyuubi.engine.spark.events
 
+import org.apache.spark.SparkContext
 import org.apache.spark.kyuubi.SparkContextHelper
 
 import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.config.KyuubiConf.ENGINE_EVENT_JSON_LOG_PATH
-import org.apache.kyuubi.config.KyuubiConf.ENGINE_EVENT_LOGGERS
-import org.apache.kyuubi.engine.spark.{KyuubiSparkUtil, SparkSQLEngine}
+import org.apache.kyuubi.config.KyuubiConf.{ENGINE_EVENT_JSON_LOG_PATH, 
ENGINE_EVENT_LOGGERS}
 import org.apache.kyuubi.engine.spark.events.EventLoggingService._service
-import org.apache.kyuubi.events.AbstractEventLoggingService
-import org.apache.kyuubi.events.EventLoggerType
-import org.apache.kyuubi.events.JsonEventLogger
+import org.apache.kyuubi.events.{AbstractEventLoggingService, EventLoggerType, 
JsonEventLogger}
 
-class EventLoggingService(engine: SparkSQLEngine)
+class EventLoggingService(spark: SparkContext)
   extends AbstractEventLoggingService[KyuubiSparkEvent] {
 
-  override def initialize(conf: KyuubiConf): Unit = {
+  override def initialize(conf: KyuubiConf): Unit = synchronized {
     conf.get(ENGINE_EVENT_LOGGERS)
       .map(EventLoggerType.withName)
       .foreach{
         case EventLoggerType.SPARK =>
-          
addEventLogger(SparkContextHelper.createSparkHistoryLogger(engine.spark.sparkContext))
+          addEventLogger(SparkContextHelper.createSparkHistoryLogger(spark))
         case EventLoggerType.JSON =>
-          val jsonEventLogger = new 
JsonEventLogger[KyuubiSparkEvent](KyuubiSparkUtil.engineId,
-            ENGINE_EVENT_JSON_LOG_PATH, 
engine.spark.sparkContext.hadoopConfiguration)
+          val jsonEventLogger = new JsonEventLogger[KyuubiSparkEvent](
+            spark.applicationAttemptId.getOrElse(spark.applicationId),
+            ENGINE_EVENT_JSON_LOG_PATH,
+            spark.hadoopConfiguration)
           addService(jsonEventLogger)
           addEventLogger(jsonEventLogger)
         case logger =>
@@ -49,12 +48,13 @@ class EventLoggingService(engine: SparkSQLEngine)
     super.initialize(conf)
   }
 
-  override def start(): Unit = {
-    _service = Some(this)
+  override def start(): Unit = synchronized {
     super.start()
+    // expose the event logging service only when the loggers successfully 
start
+    _service = Some(this)
   }
 
-  override def stop(): Unit = {
+  override def stop(): Unit = synchronized {
     _service = None
     super.stop()
   }
diff --git 
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/events/EventLoggingServiceSuite.scala
 
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/events/EventLoggingServiceSuite.scala
index b8926bb..0ab844b 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/events/EventLoggingServiceSuite.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/events/EventLoggingServiceSuite.scala
@@ -99,7 +99,7 @@ class EventLoggingServiceSuite extends WithSparkSQLEngine 
with JDBCTestUtils {
 
   test("statementEvent: generate, dump and query") {
     val statementEventPath = Paths.get(
-      logRoot, "spark_statement", s"day=$currentDate", engine.engineId + 
".json")
+      logRoot, "spark_statement", s"day=$currentDate", 
spark.sparkContext.applicationId + ".json")
     val sql = "select timestamp'2021-09-01';"
     withSessionHandle { (client, handle) =>
 

Reply via email to